// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"). You may not // use this file except in compliance with the License. A copy of the // License is located at // // http://aws.amazon.com/apache2.0/ // // or in the "license" file accompanying this file. This file is distributed // on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, // either express or implied. See the License for the specific language governing // permissions and limitations under the License. // Package service wraps SSM service package service import ( "bytes" "fmt" "sync" "time" "github.com/aws/amazon-ssm-agent/agent/association/cache" "github.com/aws/amazon-ssm-agent/agent/association/model" "github.com/aws/amazon-ssm-agent/agent/association/schedulemanager" "github.com/aws/amazon-ssm-agent/agent/context" "github.com/aws/amazon-ssm-agent/agent/jsonutil" "github.com/aws/amazon-ssm-agent/agent/log" "github.com/aws/amazon-ssm-agent/agent/plugins/pluginutil" "github.com/aws/amazon-ssm-agent/agent/sdkutil" ssmsvc "github.com/aws/amazon-ssm-agent/agent/ssm" "github.com/aws/amazon-ssm-agent/agent/times" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ssm" "github.com/twinj/uuid" ) const ( stopPolicyErrorThreshold = 10 latestDoc = "$LATEST" cronExpressionEveryFiveMinutes = "cron(0 0/5 * 1/1 * ? *)" NoOutputUrl = "" ) type associationApiMode string const ( instanceAssociationMode associationApiMode = "instanceAssociationMode" legacyAssociationMode associationApiMode = "legacyAssociationMode" ) var ( currentAssociationApiMode associationApiMode = instanceAssociationMode lock sync.RWMutex ) // T represents interface for association type T interface { CreateNewServiceIfUnHealthy(context context.T) ListInstanceAssociations(log log.T, instanceID string) ([]*model.InstanceAssociation, error) LoadAssociationDetail(log log.T, assoc *model.InstanceAssociation) error UpdateAssociationStatus( log log.T, associationName string, instanceID string, status string, executionSummary string) UpdateInstanceAssociationStatus( log log.T, associationID string, associationName string, instanceID string, status string, errorCode string, executionDate string, executionSummary string, outputUrl string) IsInstanceAssociationApiMode() bool DescribeAssociation(log log.T, instanceID string, docName string) (response *ssm.DescribeAssociationOutput, err error) } // AssociationService wraps the Ssm Service type AssociationService struct { context context.T ssmSvc ssmsvc.Service stopPolicy *sdkutil.StopPolicy name string } // NewAssociationService returns a new association service func NewAssociationService(context context.T, name string) *AssociationService { ssmService := ssmsvc.NewService(context) policy := sdkutil.NewStopPolicy(name, stopPolicyErrorThreshold) svc := AssociationService{ context: context, ssmSvc: ssmService, stopPolicy: policy, name: name, } return &svc } // CreateNewServiceIfUnHealthy checks service healthy and create new service if original is unhealthy func (s *AssociationService) CreateNewServiceIfUnHealthy(context context.T) { log := s.context.Log() if s.stopPolicy == nil { log.Debugf("creating new stop-policy.") s.stopPolicy = sdkutil.NewStopPolicy(s.name, stopPolicyErrorThreshold) } log.Debugf("assocProcessor's stoppolicy before polling is %v", s.stopPolicy) if !s.stopPolicy.IsHealthy() { log.Errorf("assocProcessor stopped temporarily due to internal failure. We will retry automatically") // reset stop policy and let the scheduler start the polling after pollMessageFrequencyMinutes timeout s.stopPolicy.ResetErrorCount() s.ssmSvc = ssmsvc.NewService(s.context) return } } // ListInstanceAssociations will get the Association and related document string func (s *AssociationService) ListInstanceAssociations(log log.T, instanceID string) ([]*model.InstanceAssociation, error) { uuid.SwitchFormat(uuid.CleanHyphen) results := []*model.InstanceAssociation{} response, err := s.ssmSvc.ListInstanceAssociations(log, instanceID, nil) // if ListInstanceAssociations return error, system will try to use legacy ListAssociations if err != nil { s.setAssociationApiMode(legacyAssociationMode) if results, err = s.ListAssociations(log, instanceID); err != nil { return results, fmt.Errorf("unable to retrieve associations %v", err) } } else { s.setAssociationApiMode(instanceAssociationMode) for { for _, assoc := range response.Associations { rawData := &model.InstanceAssociation{} rawData.Association = assoc rawData.CreateDate = time.Now().UTC() results = append(results, rawData) } if response.NextToken == nil || *response.NextToken == "" { break } if response, err = s.ssmSvc.ListInstanceAssociations(log, instanceID, response.NextToken); err != nil { return results, fmt.Errorf("unable to retrieve associations %v", err) } } } log.Debug("Number of associations is ", len(results)) return results, nil } func (s *AssociationService) setAssociationApiMode(api associationApiMode) { lock.Lock() defer lock.Unlock() currentAssociationApiMode = api } // ListAssociations will get the Association and related document string from legacy api func (s *AssociationService) ListAssociations(log log.T, instanceID string) ([]*model.InstanceAssociation, error) { var parameterResponse *ssm.DescribeAssociationOutput results := []*model.InstanceAssociation{} response, err := s.ssmSvc.ListAssociations(log, instanceID) if err != nil { return nil, fmt.Errorf("unable to retrieve associations %v", err) } if len(response.Associations) > 0 { // Legacy ListAssociation only supports one association to ba associated at a time assoc := response.Associations[0] // Call descriptionAssociation and retrieve the parameter json string if parameterResponse, err = s.ssmSvc.DescribeAssociation(log, instanceID, *assoc.Name); err != nil { log.Errorf("unable to retrieve association parameter, %v", err) return nil, err } rawData := &model.InstanceAssociation{} rawData.Association = &ssm.InstanceAssociationSummary{ AssociationId: assoc.Name, DocumentVersion: aws.String(""), Name: assoc.Name, InstanceId: aws.String(instanceID), Checksum: aws.String(""), Parameters: parameterResponse.AssociationDescription.Parameters, DetailedStatus: parameterResponse.AssociationDescription.Status.Name, } rawData.CreateDate = time.Now().UTC() results = append(results, rawData) } return results, nil } // DescribeAssociation wraps ssm service DescribeAssociation func (s *AssociationService) DescribeAssociation(log log.T, instanceID string, docName string) (response *ssm.DescribeAssociationOutput, err error) { return s.ssmSvc.DescribeAssociation(log, instanceID, docName) } // UpdateInstanceAssociationStatus will get the Association and related document string func (s *AssociationService) UpdateInstanceAssociationStatus( log log.T, associationID string, associationName string, instanceID string, status string, errorCode string, executionDate string, executionSummary string, outputUrl string) { var err error // Update status in schedulemanager to ensure state matches with the one on the service schedulemanager.UpdateAssociationStatus(associationID, status) if s.IsInstanceAssociationApiMode() { date := times.ParseIso8601UTC(executionDate) if executionSummary, err = pluginutil.ReadPrefix(bytes.NewBufferString(executionSummary), 510, "--output truncated--"); err != nil { log.Error(err) } executionResult := ssm.InstanceAssociationExecutionResult{ Status: aws.String(status), ErrorCode: aws.String(errorCode), ExecutionDate: aws.Time(date), ExecutionSummary: aws.String(executionSummary), } if outputUrl != NoOutputUrl { s3OutputUrl := ssm.S3OutputUrl{OutputUrl: aws.String(outputUrl)} executionResult.OutputUrl = &ssm.InstanceAssociationOutputUrl{S3OutputUrl: &s3OutputUrl} } var executionResultContent string if executionResultContent, err = jsonutil.Marshal(executionResult); err != nil { log.Errorf("could not marshal associationStatus, %v", err) return } log.Info("Updating association status ", jsonutil.Indent(executionResultContent)) var response *ssm.UpdateInstanceAssociationStatusOutput if response, err = s.ssmSvc.UpdateInstanceAssociationStatus(log, associationID, instanceID, &executionResult); err != nil { log.Errorf("unable to update association status, %v", err) // After machine reboot system turn back to use UpdateInstanceAssociationStatus for legacy association // instead of using legacy UpdateAssociationStatus api // When we get error during update, run UpdateAssociationStatus if associationName is equal to associationId // which indicates legacy association loaded from ListAssociation api // Otherwise log the error and return. if associationID == associationName { s.UpdateAssociationStatus(log, associationName, instanceID, status, executionSummary) } return } var responseContent string if responseContent, err = jsonutil.Marshal(response); err != nil { log.Error("could not marshal response! ", err) return } log.Info("Update instance association status response content is ", jsonutil.Indent(responseContent)) return } s.UpdateAssociationStatus(log, associationName, instanceID, status, executionSummary) return } // UsingInstanceAssociationApi represents if the agent is using new InstanceAssociationApi for listing and updating func (s *AssociationService) IsInstanceAssociationApiMode() bool { lock.Lock() defer lock.Unlock() return currentAssociationApiMode == instanceAssociationMode } // LoadAssociationDetail loads document contents and parameters for the given association func (s *AssociationService) LoadAssociationDetail(log log.T, assoc *model.InstanceAssociation) error { associationCache := cache.GetCache() associationID := assoc.Association.AssociationId // check if the association details have been cached if associationCache.IsCached(*associationID) { rawData := associationCache.Get(*associationID) assoc.Document = rawData.Document return nil } // if not cached before var ( documentResponse *ssm.GetDocumentOutput err error ) // TODO: add a retry here // Call getDocument and retrieve the document json string if documentResponse, err = s.ssmSvc.GetDocument(log, *assoc.Association.Name, *assoc.Association.DocumentVersion); err != nil { log.Errorf("unable to retrieve document, %v", err) return err } assoc.Document = documentResponse.Content if err = associationCache.Add(*associationID, assoc); err != nil { return err } return nil } // UpdateAssociationStatus update association status func (s *AssociationService) UpdateAssociationStatus( log log.T, associationName string, instanceID string, status string, executionSummary string) { config := s.context.AppConfig() agentInfoContent, err := jsonutil.Marshal(config.Agent) if err != nil { log.Error("could not marshal agentInfo! ", err) return } currentTime := time.Now().UTC() associationStatus := ssm.AssociationStatus{ Name: aws.String(status), Message: aws.String(executionSummary), Date: ¤tTime, AdditionalInfo: &agentInfoContent, } associationStatusContent, err := jsonutil.Marshal(associationStatus) if err != nil { log.Error("could not marshal associationStatus! ", err) return } log.Info("Update association status content is ", jsonutil.Indent(associationStatusContent)) // Call getDocument and retrieve the document json string response, err := s.ssmSvc.UpdateAssociationStatus( log, instanceID, associationName, &associationStatus) if err != nil { log.Errorf("unable to update association status, %v", err) sdkutil.HandleAwsError(log, err, s.stopPolicy) return } responseContent, err := jsonutil.Marshal(response) if err != nil { log.Error("could not marshal response! ", err) return } log.Info("Update association status response content is ", jsonutil.Indent(responseContent)) }