package main import ( "encoding/json" "fmt" "io/ioutil" "os" "path/filepath" "runtime" "strconv" "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) const ( envAWSRegion = "AWS_REGION" envS3Bucket = "S3_BUCKET_NAME" envS3Action = "S3_ACTION" envS3Prefix = "S3_PREFIX" envTestFile = "TEST_FILE" envExpectedLogsLen = "EXPECTED_EVENTS_LEN" retries = 2 retrySleep = 5 ) type Message struct { Log string } func main() { region := os.Getenv(envAWSRegion) if region == "" { exitErrorf("[TEST FAILURE] AWS Region required. Set the value for environment variable- %s", envAWSRegion) } bucket := os.Getenv(envS3Bucket) if bucket == "" { exitErrorf("[TEST FAILURE] Bucket name required. Set the value for environment variable- %s", envS3Bucket) } prefix := os.Getenv(envS3Prefix) if prefix == "" { exitErrorf("[TEST FAILURE] S3 object prefix required. Set the value for environment variable- %s", envS3Prefix) } testFile := os.Getenv(envTestFile) if testFile == "" { exitErrorf("[TEST FAILURE] test verfication file name required. Set the value for environment variable- %s", envTestFile) } expectedEventsLen := os.Getenv(envExpectedLogsLen) if expectedEventsLen == "" { exitErrorf("[TEST FAILURE] number of expected log events required. Set the value for environment variable- %s", envExpectedLogsLen) } numEvents, convertionError := strconv.Atoi(expectedEventsLen) if convertionError != nil { exitErrorf("[TEST FAILURE] String to Int convertion Error for EXPECTED_EVENTS_LEN:", convertionError) } s3Client, err := getS3Client(region) if err != nil { exitErrorf("[TEST FAILURE] Unable to create new S3 client: %v", err) } s3Action := os.Getenv(envS3Action) if s3Action == "validate" { // Validate the data on the s3 bucket for i := 0; i <= retries; i++ { success, canRetry := validate(s3Client, prefix, bucket, testFile, numEvents) if success { fmt.Println("[VALIDATION SUCCESSFULL]") break } else if !canRetry { break } time.Sleep(retrySleep * time.Second) } } else { // Clean the s3 bucket-- delete all objects for i := 0; i <= retries; i++ { success := deleteS3Objects(s3Client, bucket, prefix) if success { break } time.Sleep(retrySleep * time.Second) } } } // Creates a new S3 Client func getS3Client(region string) (*s3.S3, error) { sess, err := session.NewSession(&aws.Config{ Region: aws.String(region)}, ) if err != nil { return nil, err } return s3.New(sess), nil } // Returns all the objects from a S3 bucket with the given prefix func getS3Objects(s3Client *s3.S3, bucket string, prefix string) *s3.ListObjectsV2Output { input := &s3.ListObjectsV2Input{ Bucket: aws.String(bucket), MaxKeys: aws.Int64(100), Prefix: aws.String(prefix), } response, err := s3Client.ListObjectsV2(input) if err != nil { fmt.Fprintf(os.Stderr,"[TEST FAILURE] Error occured to get the objects from bucket: %q., %v", bucket, err) return nil } return response } // Validates the log messages. Our log producer is designed to send 1000 integers [0 - 999]. // Both of the Kinesis Streams and Kinesis Firehose try to send each log maintaining the "at least once" policy. // To validate, we need to make sure all the valid numbers [0 - 999] are stored at least once. // returns success, can retry // if the failure was on a network call, then we can retry func validate(s3Client *s3.S3, prefix string, bucket string, testFile string, numEvents int) (bool, bool) { response := getS3Objects(s3Client, bucket, prefix) if response == nil { return false, true } logCounter := make([]int, numEvents) for index := range logCounter { logCounter[index] = 1 } for i := range response.Contents { input := &s3.GetObjectInput{ Bucket: aws.String(bucket), Key: response.Contents[i].Key, } obj := getS3Object(s3Client, input) if obj == nil { return false, true } dataByte, err := ioutil.ReadAll(obj.Body) if err != nil { fmt.Fprintf(os.Stderr,"[TEST FAILURE] Error to parse GetObject response. %v", err) return false, true } data := strings.Split(string(dataByte), "\n") for _, d := range data { if d == "" { continue } if len(d) > 500 { continue } var message Message decodeError := json.Unmarshal([]byte(d), &message) if decodeError != nil { fmt.Fprintf(os.Stderr,"[TEST FAILURE] Json Unmarshal Error:", decodeError) return false, false } if runtime.GOOS == "windows" { // On Windows, we would have additional \r which needs to be stripped. message.Log = strings.ReplaceAll(message.Log, "\r", "") } number, convertionError := strconv.Atoi(message.Log) if convertionError != nil { fmt.Fprintf(os.Stderr,"[TEST FAILURE] String to Int convertion Error:", convertionError) return false, false } if number < 0 || number >= numEvents { fmt.Fprintf(os.Stderr,"[TEST FAILURE] Invalid number: %d found. Expected value in range (0 - %d)", number, numEvents) return false, false } logCounter[number] = 0 } } sum := 0 for i := range logCounter { sum += logCounter[i] } if sum > 0 { fmt.Fprintf(os.Stderr,"[TEST FAILURE] Validation Failed. Number of missing log records: %d", sum) return false, false } else { fmt.Println("[TEST SUCCESSFULL] Found all the log records.") // The file was created when the integ test started. Removing this file as a flag of test success. os.Remove(filepath.Join("/out", testFile)) return true, false } } // Retrieves an object from a S3 bucket func getS3Object(s3Client *s3.S3, input *s3.GetObjectInput) *s3.GetObjectOutput { obj, err := s3Client.GetObject(input) if err != nil { fmt.Fprintf(os.Stderr,"[TEST FAILURE] Error occured to get s3 object: %v", err) return nil } return obj } // Delete all the objects with the given prefix from the specified S3 bucket func deleteS3Objects(s3Client *s3.S3, bucket string, prefix string) bool { // Setup BatchDeleteIterator to iterate through a list of objects. iter := s3manager.NewDeleteListIterator(s3Client, &s3.ListObjectsInput{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), }) // Traverse the iterator deleting each object if err := s3manager.NewBatchDeleteWithClient(s3Client).Delete(aws.BackgroundContext(), iter); err != nil { fmt.Fprintf(os.Stderr,"[CLEAN FAILURE] Unable to delete the objects from the bucket %q., %v", bucket, err) return false } fmt.Println("[CLEAN SUCCESSFUL] All the objects are deleted from the bucket:", bucket) return true } func exitErrorf(msg string, args ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", args...) os.Exit(1) }