#
# All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates or
# its licensors.
#
# For complete copyright and license terms please see the LICENSE at the root of this
# distribution (the "License"). All use of this software is governed by the License,
# or, if provided, by the license below or the license accompanying this file. Do not
# remove or modify any license notices. This file is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
import os
import swagger_processor
import resource_manager_common.constant as constant
import resource_manager.util
import re
import random
import string
import json
from cgf_utils.version_utils import Version
from cgf_utils import custom_resource_utils
from resource_manager.uploader import Uploader
from resource_manager.errors import HandledError
from botocore.client import Config
from botocore.exceptions import ClientError
AWS_S3_BUCKET_NAME = 'CloudGemPortal'
BUCKET_ROOT_DIRECTORY_NAME = 'www'
BOOTSTRAP_VARIABLE_NAME = "var bootstrap ="
BOOTSTRAP_REGEX_PATTERN = ''.format(BOOTSTRAP_VARIABLE_NAME, json.dumps(cgp_bootstrap_config)))
content = content.replace(get_domain_variable(content),
'var domain = \'{}\''.format(get_domain(get_index_url(context, region))))
result = None
try:
# TODO: write to an unique name and configure bucket to auto delete these objects after 1 hour
# the max allowed --duration-seconds value.
result = s3_client.put_object(Bucket=bucket_id, Key=constant.PROJECT_CGP_ROOT_FILE, Body=content,ContentType='text/html')
except ClientError as e:
if e.response["Error"]["Code"] in ["AccessDenied"]:
credentials = context.aws.load_credentials()
access_key = credentials.get(constant.DEFAULT_SECTION_NAME, constant.ACCESS_KEY_OPTION)
context.view._output_message("The Lumberyard user '{0}' associated with AWS IAM access key identifier '{1}' is missing PUT permissions on the S3 bucket '{2}'. Now attempting to use old Cloud Gem Portal pre-signed urls.\nHave the administrator grant the AWS user account with access key '{1}' S3 PUT permissions for bucket '{2}'".format(context.config.user_default_profile, access_key, bucket_id))
else:
raise HandledError("Could not write to the key '{}' in the S3 bucket '{}'.".format(constant.PROJECT_CGP_ROOT_FILE,bucket_id), e)
if result is None or result['ResponseMetadata']['HTTPStatusCode'] == 200:
context.view._output_message("The Cloud Gem Portal bootstrap information has been written successfully.")
else:
raise HandledError("The index.html cloud not be set in the S3 bucket '{}'. This Cloud Gem Portal site will not load.".format(bucket_id))
updateUserPoolEmailMessage(context, get_index_url(context, region), project_config_bucket_id)
def get_index(s3_client, bucket_id):
# Request the index file
try:
s3_index_obj_request = s3_client.get_object(Bucket=bucket_id, Key=constant.PROJECT_CGP_ROOT_FILE)
except ClientError as e:
raise HandledError(
"Could not read from the key '{}' in the S3 bucket '{}'.".format(constant.PROJECT_CGP_ROOT_FILE, bucket_id),
e)
# Does the user have access to it?
if s3_index_obj_request['ResponseMetadata']['HTTPStatusCode'] != 200:
raise HandledError(
"The user does not have access to the file index.html file. This Cloud Gem Portal site will not load.")
content = s3_index_obj_request['Body'].read().decode('utf-8')
return content
def get_index_url(context, region):
if context.config.custom_domain_name:
service_url = 'https://{}/{}.api.{}'.format(context.config.custom_domain_name, region, get_service_api_id(context))
else:
service_url = 'https://{}.execute-api.{}.amazonaws.com/api'.format(get_service_api_id(context), region)
return "{}/open-cloud-gem-portal".format(service_url)
def get_bootstrap(s3_client, bucket_id):
content = get_index(s3_client, bucket_id)
return get_match_group(BOOTSTRAP_REGEX_PATTERN, content)
def get_domain_variable(content):
return get_match_group(DOMAIN_REGEX_PATTERN, content)
def get_domain(presigned_url):
parts = presigned_url.split('/')
return parts[2]
def get_service_api_id(context):
cgp_service_api_info = custom_resource_utils.get_embedded_physical_id(context.stack.get_physical_resource_id(context.config.project_stack_id, "ServiceApi"))
return json.loads(cgp_service_api_info.replace(get_match_group(SERVICE_API_PREFIX_PATTERN, cgp_service_api_info), ""))['RestApiId']
def get_match_group(pattern, content):
match = re.search(pattern, content, re.M | re.I)
return match.group()
def create_portal_administrator(context):
resource = context.config.project_resources[constant.PROJECT_RESOURCE_NAME_USER_POOL]
stackid = resource['StackId']
region = resource_manager.util.get_region_from_arn(stackid)
user_pool_id = custom_resource_utils.get_embedded_physical_id(resource['PhysicalResourceId'])
client = context.aws.client('cognito-idp', region=region)
administrator_name = 'administrator'
is_new_user = False
password = None
try:
response = client.admin_get_user(
UserPoolId=user_pool_id,
Username=administrator_name
)
user_exists = True
if 'UserStatus' in response and response['UserStatus'] == 'FORCE_CHANGE_PASSWORD':
is_new_user = True
except ClientError:
user_exists = False
if not user_exists:
# create the account if it does not
random_str = ''.join(
random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(8))
password = ''.join((random.choice(string.ascii_uppercase),
random.choice(string.ascii_lowercase),
random.choice(string.digits), '@', random_str))
# shuffle password characters
chars = list(password)
random.shuffle(chars)
password = ''.join(chars)
try:
response = client.admin_create_user(
UserPoolId=user_pool_id,
Username=administrator_name,
TemporaryPassword=password
)
response = client.admin_add_user_to_group(
UserPoolId=user_pool_id,
Username=administrator_name,
GroupName='administrator'
)
is_new_user = True
except ClientError as e:
credentials = context.aws.load_credentials()
access_key = credentials.get(constant.DEFAULT_SECTION_NAME, constant.ACCESS_KEY_OPTION)
raise HandledError(
"Failed to create the administrator account with username '{}' and password '{}'. Have your administrator verify the user account '{}' with access key '{}' has the policies ['cognito-idp:AdminCreateUser', 'cognito-idp:AdminAddUserToGroup'].".format(
administrator_name, password, context.config.user_default_profile, access_key), e)
return is_new_user, administrator_name, password
def updateUserPoolEmailMessage(context, url, project_config_bucket_id):
project_name_parts = project_config_bucket_id.split('-')
project_name = project_name_parts[0]
resource = context.config.project_resources[constant.PROJECT_RESOURCE_NAME_USER_POOL]
stackid = resource['StackId']
region = resource_manager.util.get_region_from_arn(stackid)
user_pool_id = custom_resource_utils.get_embedded_physical_id(resource['PhysicalResourceId'])
client = context.aws.client('cognito-idp', region=region)
email_invite_subject = "Your Amazon Lumberyard Cloud Gem Portal temporary password"
email_invite_message = "Your Amazon Lumberyard Administrator has invited you to the project " + project_name + "'s Cloud Gem Portal.
Username: {username}
Temporary Password: {####}
Cloud Gem Portal URL: " + url
try:
client.update_user_pool(
UserPoolId=user_pool_id,
EmailVerificationMessage="You or your Amazon Lumberyard Administrator has reset your password for the Cloud Gem Portal on your project '" + project_name + "'.
You will need this code to change your password when you login next.
Code: {####}
Cloud Gem Portal URL: " + url,
EmailVerificationSubject='Your Amazon Lumberyard Cloud Gem Portal verification code',
AdminCreateUserConfig={
'InviteMessageTemplate': {
'EmailMessage': email_invite_message,
'EmailSubject': email_invite_subject
},
"AllowAdminCreateUserOnly": True
},
AutoVerifiedAttributes=['email']
)
context.view._output_message("The Cloud Gem Portal URL has been written to the Cognito user email template successfully.")
except ClientError:
return
# version constants
V_1_0_0 = Version('1.0.0')
V_1_1_0 = Version('1.1.0')
V_1_1_1 = Version('1.1.1')
V_1_1_2 = Version('1.1.2')
V_1_1_3 = Version('1.1.3')
V_1_1_4 = Version('1.1.4')
V_1_1_5 = Version('1.1.5') # Version without CloudGemPortal
def add_framework_version_update_writable_files(hook, from_version, to_version, writable_file_paths, **kwargs):
# Repeat this pattern to add more upgrade processing:
#
# if from_version < VERSION_CHANGE_WAS_INTRODUCED:
# add files that may be written to
# For now only need to update local-project-settings.json, which will always
# be writable when updating the framework version.
pass
def before_framework_version_updated(hook, from_version, to_version, **kwargs):
"""Called by the framework before updating the project's framework version."""
if from_version < V_1_1_1:
hook.context.resource_group_controller.before_update_framework_version_to_1_1_1(from_version)
if from_version < V_1_1_2:
hook.context.resource_group_controller.before_update_framework_version_to_1_1_2(from_version)
if from_version < V_1_1_3:
hook.context.resource_group_controller.before_update_framework_version_to_1_1_3(from_version)
if from_version < V_1_1_4:
hook.context.resource_group_controller.before_update_framework_version_to_1_1_4(from_version)
if from_version < V_1_1_5:
hook.context.resource_group_controller.before_update_framework_version_to_1_1_5(from_version)
# Repeat this pattern to add more upgrade processing:
#
# if from_version < VERSION_CHANGE_WAS_INTRODUCED:
# call an upgrade function (don't put upgrade logic in this function)
def after_framework_version_updated(hook, from_version, to_version, **kwargs):
"""Called by the framework after updating the project's framework version."""
# Repeat this pattern to add more upgrade processing:
#
# if from_version < VERSION_CHANGE_WAS_INTRODUCED:
# call an upgrade function (don't put upgrade logic in this function)
# currently don't have anything to do...
pass
def __get_presigned_url(s3, bucket_id, key, expiration):
presigned_url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': bucket_id, 'Key': key},
ExpiresIn=expiration,
HttpMethod="GET")
return presigned_url