# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import datetime import fileinput import os import re import subprocess import sys from cement.utils.misc import minimal_logger from cement.utils.shell import exec_cmd from ebcli.lib import utils from ebcli.core import fileoperations, io from ebcli.objects.exceptions import ( CommandError, NotInitializedError, NoSourceControlError ) from ebcli.resources.strings import git_ignore, strings LOG = minimal_logger(__name__) class SourceControl(object): name = 'base' def __init__(self): self.name = '' def get_name(self): return None def get_current_branch(self): pass def do_zip(self, location, staged=False): pass def set_up_ignore_file(self): pass def get_version_label(self): pass def untracked_changes_exist(self): pass @staticmethod def get_source_control(): try: git_installed = fileoperations.get_config_setting('global', 'sc') except NotInitializedError: git_installed = False if not git_installed: if Git().is_setup(): return Git() else: return NoSC() return Git() class NoSC(SourceControl): """ No source control installed """ DEFAULT_MESSAGE = 'EB-CLI deploy' def get_name(self): return None def get_version_label(self): suffix = datetime.datetime.now().strftime("%y%m%d_%H%M%S%f") return 'app-' + suffix def get_current_branch(self): return 'default' def do_zip(self, location, staged=False): io.log_info('Creating zip using systems zip') fileoperations.zip_up_project(location) def get_message(self): return NoSC.DEFAULT_MESSAGE def is_setup(self): pass def set_up_ignore_file(self): Git().set_up_ignore_file() def clean_up_ignore_file(self): pass def untracked_changes_exist(self): return False class Git(SourceControl): """ The user has git installed """ codecommit_remote_name = 'codecommit-origin' def get_name(self): return 'git' def _handle_exitcode(self, exitcode, stderr): if exitcode == 0: return if exitcode == 127: # 127 = git not installed raise NoSourceControlError if exitcode == 128: # 128 = No HEAD if "HEAD" in stderr: LOG.debug( 'An error occurred while handling git command.\nError code: ' + str(exitcode) + ' Error: ' + stderr ) raise CommandError( 'git could not find the HEAD; ' 'most likely because there are no commits present' ) raise CommandError('An error occurred while handling git command.' '\nError code: ' + str(exitcode) + ' Error: ' + stderr) def get_version_label(self): io.log_info('Getting version label from git with git-describe') stdout, stderr, exitcode = \ self._run_cmd(['git', 'describe', '--always', '--abbrev=4']) version_label = 'app-{}-{:%y%m%d_%H%M%S%f}'.format(stdout, datetime.datetime.now()) return version_label.replace('.', '_') def untracked_changes_exist(self): try: result = subprocess.check_output(['git', 'diff', '--numstat']) if isinstance(result, bytes): result = result.decode() LOG.debug('Result of `git diff --numstat`: ' + result) except subprocess.CalledProcessError as e: LOG.debug('`git diff --numstat` resulted in an error: ' + str(e)) def get_current_repository(self): current_branch = self.get_current_branch() get_remote_name_command = ['git', 'config', '--get', 'branch.{0}.remote'.format(current_branch)] LOG.debug( 'Getting current repository name based on the current branch name:' '{0}'.format(' '.join(get_remote_name_command)) ) stdout, stderr, exitcode = self._run_cmd( get_remote_name_command, handle_exitcode=False ) current_remote = stdout if exitcode != 0: LOG.debug("No remote found for the current working directory.") current_remote = None LOG.debug('Found remote: {}'.format(current_remote)) return current_remote def get_current_branch(self): revparse_command = ['git', 'rev-parse', '--abbrev-ref', 'HEAD'] LOG.debug('Getting current branch name by performing `{0}`'.format(' '.join(revparse_command))) stdout, stderr, exitcode = self._run_cmd(revparse_command, handle_exitcode=False) if stdout.strip() == 'HEAD': io.log_warning('Git is in a detached head state. Using branch "default".') return 'default' else: self._handle_exitcode(exitcode, stderr) LOG.debug(stdout) return stdout def get_current_commit(self): latest_commit_command = ['git', 'rev-parse', '--verify', 'HEAD'] LOG.debug('Getting current commit by performing `{0}`'.format(' '.join(latest_commit_command))) stdout, stderr, exitcode = self._run_cmd( latest_commit_command, handle_exitcode=False ) if exitcode != 0: LOG.debug('No current commit found') return else: self._handle_exitcode(exitcode, stderr) LOG.debug(stdout) return stdout def do_zip_submodule(self, main_location, sub_location, staged=False, submodule_dir=None): if staged: commit_id, stderr, exitcode = self._run_cmd(['git', 'write-tree']) else: commit_id = 'HEAD' io.log_info('creating zip using git submodule archive {0}'.format(commit_id)) # individually zip submodules if there are any stdout, stderr, exitcode = self._run_cmd(['git', 'archive', '-v', '--format=zip', '--prefix', os.path.join(submodule_dir, ''), '-o', sub_location, commit_id]) io.log_info('git archive output: {0}'.format(stderr)) fileoperations.zip_append_archive(main_location, sub_location) fileoperations.delete_file(sub_location) def do_zip(self, location, staged=False): cwd = os.getcwd() try: fileoperations.ProjectRoot.traverse() if staged: commit_id, stderr, exitcode = self._run_cmd(['git', 'write-tree']) else: commit_id = 'HEAD' io.log_info('creating zip using git archive {0}'.format(commit_id)) stdout, stderr, exitcode = self._run_cmd( ['git', 'archive', '-v', '--format=zip', '-o', location, commit_id]) io.log_info('git archive output: {0}'.format(stderr)) project_root = os.getcwd() must_zip_submodules = fileoperations.get_config_setting('global', 'include_git_submodules') if must_zip_submodules: stdout, stderr, exitcode = self._run_cmd(['git', 'submodule', 'foreach', '--recursive']) for index, line in enumerate(stdout.splitlines()): submodule_dir = line.split(' ')[1].strip('\'') os.chdir(os.path.join(project_root, submodule_dir)) self.do_zip_submodule( location, "{0}_{1}".format( location, str(index) ), staged=staged, submodule_dir=submodule_dir ) finally: os.chdir(cwd) def get_message(self): stdout, stderr, exitcode = self._run_cmd( ['git', 'log', '--oneline', '-1']) return stdout.split(' ', 1)[1] def is_setup(self): if fileoperations.is_git_directory_present(): if not fileoperations.program_is_installed('git'): raise CommandError(strings['sc.gitnotinstalled']) else: return True return False def set_up_ignore_file(self): if not os.path.exists('.gitignore'): open('.gitignore', 'w') else: with open('.gitignore', 'r') as f: for line in f: if line.strip() == git_ignore[0]: return with open('.gitignore', 'a') as f: f.write('\n') for line in git_ignore: f.write('{}\n'.format(line)) def clean_up_ignore_file(self): cwd = os.getcwd() try: fileoperations.ProjectRoot.traverse() in_section = False for line in fileinput.input('.gitignore', inplace=True): if line.startswith(git_ignore[0]): in_section = True if not line.strip(): in_section = False if not in_section: print(line, end='') finally: os.chdir(cwd) def push_codecommit_code(self): io.log_info('Pushing local code to codecommit with git-push') stdout, stderr, exitcode = self._run_cmd( [ 'git', 'push', self.get_current_repository(), self.get_current_branch() ] ) if exitcode != 0: io.log_warning('Git is not able to push code: {0}'.format(exitcode)) io.log_warning(stderr) else: LOG.debug('git push result: {0}'.format(stdout)) self._handle_exitcode(exitcode, stderr) def setup_codecommit_remote_repo(self, remote_url): self.verify_url_is_a_codecommit_url(remote_url) remote_add_command = ['git', 'remote', 'add', self.codecommit_remote_name, remote_url] LOG.debug('Adding remote: {0}'.format(' '.join(remote_add_command))) stdout, stderr, exitcode = self._run_cmd(remote_add_command, handle_exitcode=False) if exitcode != 0: if exitcode == 128: remote_set_url_command = [ 'git', 'remote', 'set-url', self.codecommit_remote_name, remote_url ] LOG.debug( 'Remote already exists, performing: {0}'.format( ' '.join(remote_set_url_command) ) ) self._run_cmd(remote_set_url_command) remote_set_url_with_push_command = [ 'git', 'remote', 'set-url', '--push', self.codecommit_remote_name, remote_url ] LOG.debug( ' {0}'.format( ' '.join(remote_set_url_with_push_command) ) ) self._run_cmd(remote_set_url_with_push_command) else: LOG.debug("Error setting up git config for CodeCommit: {0}".format(stderr)) return else: remote_set_url_with_add_push_command = [ 'git', 'remote', 'set-url', '--add', '--push', self.codecommit_remote_name, remote_url ] LOG.debug( 'Setting remote URL and pushing to it: {0}'.format( ' '.join(remote_set_url_with_add_push_command) ) ) self._run_cmd(remote_set_url_with_add_push_command) self._handle_exitcode(exitcode, stderr) LOG.debug('git remote result: ' + stdout) def setup_new_codecommit_branch(self, branch_name): LOG.debug("Setting up CodeCommit branch") self.fetch_remote_branches(self.codecommit_remote_name) self.checkout_branch(branch_name, create_branch=True) stdout, stderr, exitcode = self._run_cmd( ['git', 'push', '-u', self.codecommit_remote_name, branch_name], handle_exitcode=False ) if exitcode == 1: io.log_warning('Git is not able to push code: {0}'.format(exitcode)) io.log_warning(stderr) if stderr: LOG.debug('git push error: ' + stderr) LOG.debug('git push result: ' + stdout) self.fetch_remote_branches(self.codecommit_remote_name) stdout, stderr, exitcode = self._run_cmd( [ 'git', 'branch', '--set-upstream-to', '{0}/{1}'.format(self.codecommit_remote_name, branch_name) ], handle_exitcode=False ) if stderr: LOG.debug('git branch --set-upstream-to error: ' + stderr) LOG.debug('git branch result: ' + stdout) def setup_existing_codecommit_branch(self, branch_name): self.fetch_remote_branches(self.codecommit_remote_name) self.checkout_branch(branch_name, create_branch=True) stdout, stderr, exitcode = self._run_cmd( [ 'git', 'branch', '--set-upstream-to', "{0}/{1}".format( self.codecommit_remote_name, branch_name ) ], handle_exitcode=False ) if exitcode != 0: LOG.debug('git branch --set-upstream-to error: ' + stderr) return False LOG.debug('git branch result: ' + stdout) return True def checkout_branch(self, branch_name, create_branch=False): stdout, stderr, exitcode = self._run_cmd(['git', 'checkout', branch_name], handle_exitcode=False) if exitcode != 0: LOG.debug('Git is not able to checkout code: {0}'.format(exitcode)) LOG.debug(stderr) if exitcode == 1: if create_branch: LOG.debug( "Could not checkout branch '{0}', creating the branch " "locally with current HEAD".format(branch_name) ) self._run_cmd(['git', 'checkout', '-b', branch_name]) else: return False return True def get_list_of_staged_files(self): stdout, stderr, exitcode = self._run_cmd(['git', 'diff', '--name-only', '--cached']) LOG.debug('git diff result: {0}'.format(stdout)) return stdout def create_initial_commit(self): with open('README', 'w') as readme: readme.write('') self._run_cmd(['git', 'add', 'README']) stdout, stderr, exitcode = self._run_cmd( ['git', 'commit', '--allow-empty', '-m', 'EB CLI initial commit'], handle_exitcode=False ) if exitcode != 0: LOG.debug('git was not able to initialize an empty commit: {0}'.format(stderr)) LOG.debug('git commit result: {0}'.format(stdout)) return stdout def fetch_remote_branches(self, remote_name): fetch_command = [ 'git', 'fetch', remote_name ] LOG.debug('Fetching remote branches using remote name: {0}'.format(' '.join(fetch_command))) stdout, stderr, exitcode = self._run_cmd(fetch_command, handle_exitcode=False) if exitcode != 0: LOG.debug('git fetch error: ' + stderr) return False LOG.debug('git fetch result: {0}'.format(stdout)) return True def setup_codecommit_cred_config(self): LOG.debug('Setup git config settings for code commit credentials') self._run_cmd( ['git', 'config', '--local', '--replace-all', 'credential.UseHttpPath', 'true']) self._run_cmd( ['git', 'config', '--local', '--replace-all', 'credential.helper', credential_helper_command()]) def _run_cmd(self, cmd, handle_exitcode=True): stdout, stderr, exitcode = exec_cmd(cmd) stdout = utils.decode_bytes(stdout).strip() stderr = utils.decode_bytes(stderr).strip() if handle_exitcode: self._handle_exitcode(exitcode, stderr) return stdout, stderr, exitcode def verify_url_is_a_codecommit_url(self, remote_url): codecommit_url_regex = re.compile(r'.*git-codecommit\..*\.amazonaws.com.*') if not codecommit_url_regex.search(remote_url): # Prevent communications with non-CodeCommit repositories because of unknown security implications # Integration with non-CodeCommit repositories is not something Beanstalk presently supports raise NoSourceControlError('Could not connect to repository located at {}'.format(remote_url)) def credential_helper_command(): return '!aws codecommit credential-helper $@'