import boto3 import botocore from botocore.config import Config as bConfig import json from datetime import datetime, timedelta from dateutil.tz import tzlocal from utils.Config import Config from .IamCommon import IamCommon class IamAccount(IamCommon): PASSWORD_POLICY_MIN_SCORE = 4 ROOT_LOGIN_MAX_COUNT = 3 def __init__(self, none, awsClients, users, roles): super().__init__() self.iamClient = awsClients['iamClient'] self.accClient = awsClients['accClient'] self.sppClient = awsClients['sppClient'] self.gdClient = awsClients['gdClient'] self.budgetClient = awsClients['budgetClient'] self.orgClient = awsClients['orgClient'] self.ctClient = boto3.client('cloudtrail', config=bConfig(region_name='us-east-1')) self.noOfUsers = len(users) self.roles = roles # self.__configPrefix = 'iam::settings::' self.init() def passwordPolicyScoring(self, policies): score = 0 for policy, value in policies.items(): ## no score for this: if policy in ['AllowUsersToChangePassword', 'ExpirePasswords']: continue if policy == 'MinimumPasswordLength' and value >= 8: score += 1 continue if policy == 'MaxPasswordAge' and value <= 90: score += 1 continue if policy == 'PasswordReusePrevention' and value >= 3: score += 1 continue if not value is None and value > 0: score += 1 return score def _checkPasswordPolicy(self): try: resp = self.iamClient.get_account_password_policy() policies = resp.get('PasswordPolicy') score = self.passwordPolicyScoring(policies) currVal = [] if score <= self.PASSWORD_POLICY_MIN_SCORE: for policy, num in policies.items(): currVal.append(f"{policy}={num}") output = '
'.join(currVal) self.results['passwordPolicyWeak'] = [-1, output] except botocore.exceptions.ClientError as e: ecode = e.response['Error']['Code'] print(ecode) if ecode == 'NoSuchEntity': self.results['passwordPolicy'] = [-1, ecode] def _checkRootLoginActivity(self): c = 0 LookupAttributes=[ { 'AttributeKey': 'Username', 'AttributeValue': 'root' }, { 'AttributeKey': 'Eventname', 'AttributeValue': 'ConsoleLogin' } ] StartTime=datetime.today() - timedelta(days=30) EndTime=datetime.today() + timedelta(days=1) resp = self.ctClient.lookup_events( LookupAttributes=LookupAttributes, StartTime=StartTime, EndTime=EndTime, MaxResults=50, ) ee = resp.get('Events') if len(ee) == 0: return self.results['rootConsoleLogin30days'] = [-1, ''] for e in ee: o = e.get('CloudTrailEvent') o = json.loads(o) if 'errorMessage' in o: c += 1 if c >= self.ROOT_LOGIN_MAX_COUNT: self.results['rootConsoleLoginFail3x'] = [-1, ''] return while resp.get('NextToken') != None: resp = self.ctClient.lookup_events( LookupAttributes=LookupAttributes, StartTime=StartTime, EndTime=EndTime, MaxResults=50, NextToken = resp.get('NextToken') ) ee = resp.get('Events') for e in ee: o = e.get('CloudTrailEvent') o = json.loads(o) if 'errorMessage' in o: c += 1 if c >= self.ROOT_LOGIN_MAX_COUNT: self.results['rootConsoleLoginFail3x'] = [-1, ''] return def _checkHasRole_AWSReservedSSO(self): hasReservedRole = False for role in self.roles: if role['RoleName'].startswith('AWSReservedSSO_'): hasReservedRole = True break if hasReservedRole == False: self.results['hasSSORoles'] = [-1, ''] def _checkHasExternalProvider(self): hasOpID = False hasSaml = False resp = self.iamClient.list_open_id_connect_providers() if 'OpenIDConnectProviderList' in resp: if len(resp['OpenIDConnectProviderList']) > 0: hasOpID = True resp = self.iamClient.list_saml_providers() if 'SAMLProviderList' in resp: if len(resp['SAMLProviderList']) > 0: hasSaml = True if hasOpID == False and hasSaml == False: self.results['hasExternalIdentityProvider'] = [-1, ''] def _checkHasGuardDuty(self): resp = self.gdClient.list_detectors() if 'DetectorIds' in resp: ids = resp.get('DetectorIds') if len(ids) > 0: return self.results["enableGuardDuty"] = [-1, ""] def _checkHasCostBudget(self): stsInfo = Config.get('stsInfo') budgetClient = self.budgetClient try: resp = budgetClient.describe_budgets(AccountId=stsInfo['Account']) if 'Budgets' in resp: return self.results['enableCostBudget'] = [-1, ""] except botocore.exceptions.ClientError as e: ecode = e.response['Error']['Code'] emsg = e.response['Error']['Message'] print(ecode, emsg) def _checkSupportPlan(self): sppClient = self.sppClient try: resp = sppClient.describe_severity_levels() except botocore.exceptions.ClientError as e: ecode = e.response['Error']['Code'] if ecode == 'SubscriptionRequiredException': self.results['supportPlanLowTier'] = [-1, ''] def _checkHasUsers(self): # has at least 1 for all account (root) if self.noOfUsers < 2: self.results['noUsersFound'] = [-1, 'No IAM User found'] def _checkHasAlternateContact(self): CONTACT_TYP = ['BILLING', 'SECURITY', 'OPERATIONS'] cnt = 0 for typ in CONTACT_TYP: res = self.getAlternateContactByType(typ) if res == None: res = 0 cnt += res if cnt == 0: self.results['hasAlternateContact'] = [-1, 'No alternate contacts'] def getAlternateContactByType(self, typ): try: resp = self.accClient.get_alternate_contact( AlternateContactType = typ ) return 1 except botocore.exceptions.ClientError as e: ecode = e.response['Error']['Code'] if ecode == 'ResourceNotFoundException': return 0 def _checkHasOrganization(self): try: resp = self.orgClient.describe_organization() except botocore.exceptions.ClientError as e: ecode = e.response['Error']['Code'] if ecode == 'AWSOrganizationsNotInUseException': self.results['hasOrganization'] = [-1, ''] return 0