import boto3 import time import logging import yaml refactor_space_client = boto3.client('migration-hub-refactor-spaces') def delete_flux_service(envId,appId,name): logging.info ('delete_flux_service') seevicelist_response = refactor_space_client.list_services( ApplicationIdentifier=appId, EnvironmentIdentifier=envId, MaxResults=100 ) logging.info (seevicelist_response) routelist_response = refactor_space_client.list_routes( ApplicationIdentifier=appId, EnvironmentIdentifier=envId, MaxResults=100 ) logging.info (routelist_response) ServiceSummaryList = seevicelist_response["ServiceSummaryList"] RouteSummaryList = routelist_response["RouteSummaryList"] logging.info ("ServiceSummaryList") logging.info (ServiceSummaryList) logging.info ("RouteSummaryList") logging.info (RouteSummaryList) routeDeletion = False # delete the routes for s in ServiceSummaryList: if s["Name"]==name: logging.info ("Match Found:::: "+s["Name"]+" is "+name) for r in routelist_response["RouteSummaryList"]: if r["ServiceId"] == s["ServiceId"]: logging.info(name+" - deleting route "+r["RouteId"]) refactor_space_client.delete_route( ApplicationIdentifier=appId, EnvironmentIdentifier=envId, RouteIdentifier=r["RouteId"] ) routeDeletion = True if routeDeletion : time.sleep(60) #wait for 60 sec for s in seevicelist_response["ServiceSummaryList"]: if s["Name"]==name: logging.info(name+" - deleting service") refactor_space_client.delete_service( ApplicationIdentifier=appId, EnvironmentIdentifier=envId, ServiceIdentifier=s["ServiceId"] ) def create_flux_service(envId,appId,vpcId,name,url,healthCheckUrl, path, method): logging.info ("inside create_flux_service") logging.info ("url:"+url) logging.info ("healthCheckUrl:"+healthCheckUrl) logging.info ("path:"+path) logging.info ("method:"+method) # delete the service and routes if exists delete_flux_service(envId,appId,name) response = refactor_space_client.create_service( ApplicationIdentifier=appId, Description='Autogenerated service created by refactor-space-eks-controller', EndpointType="URL", EnvironmentIdentifier=envId, Name=name, UrlEndpoint={ 'HealthUrl': healthCheckUrl, 'Url': url }, VpcId=vpcId ) logging.info(name+' service created: '+ response['ServiceId']) # Check if there is any routes exists routelist_response = refactor_space_client.list_routes( ApplicationIdentifier=appId, EnvironmentIdentifier=envId, MaxResults=100 ) logging.info (routelist_response) RouteSummaryList = routelist_response["RouteSummaryList"] strRouteType = 'DEFAULT' if len(RouteSummaryList) > 0 : strRouteType = 'URI_PATH' time.sleep(10) #wait for 10 sec logging.info(name+' now creating route: for '+ path +" with "+method) logging.info("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX") logging.info("appId:"+appId) logging.info("envId:"+envId) logging.info("RouteType:"+strRouteType) logging.info("ServiceIdentifier:"+response['ServiceId']) logging.info("Methods:"+method) logging.info("SourcePath:"+path) logging.info("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX") if strRouteType == 'DEFAULT' : route_response = refactor_space_client.create_route( ApplicationIdentifier=appId, EnvironmentIdentifier=envId, RouteType=strRouteType, ServiceIdentifier=response['ServiceId'] ) else: route_response = refactor_space_client.create_route( ApplicationIdentifier=appId, EnvironmentIdentifier=envId, RouteType=strRouteType, ServiceIdentifier=response['ServiceId'], UriPathRoute={ 'ActivationState': 'ACTIVE', 'IncludeChildPaths': True, 'Methods': [ method, ], 'SourcePath': path } ) return response['ServiceId']