import boto3 import botocore from botocore.config import Config as bConfig import json from datetime import datetime, timedelta from import tzlocal from utils.Config import Config from .IamCommon import IamCommon class IamAccount(IamCommon): PASSWORD_POLICY_MIN_SCORE = 4 ROOT_LOGIN_MAX_COUNT = 3 def __init__(self, none, awsClients, users, roles): super().__init__() self.iamClient = awsClients['iamClient'] self.accClient = awsClients['accClient'] self.sppClient = awsClients['sppClient'] self.gdClient = awsClients['gdClient'] self.budgetClient = awsClients['budgetClient'] self.orgClient = awsClients['orgClient'] self.ctClient = boto3.client('cloudtrail', config=bConfig(region_name='us-east-1')) self.noOfUsers = len(users) self.roles = roles # self.__configPrefix = 'iam::settings::' self.init() def passwordPolicyScoring(self, policies): score = 0 for policy, value in policies.items(): ## no score for this: if policy in ['AllowUsersToChangePassword', 'ExpirePasswords']: continue if policy == 'MinimumPasswordLength' and value >= 8: score += 1 continue if policy == 'MaxPasswordAge' and value <= 90: score += 1 continue if policy == 'PasswordReusePrevention' and value >= 3: score += 1 continue if not value is None and value > 0: score += 1 return score def _checkPasswordPolicy(self): try: resp = self.iamClient.get_account_password_policy() policies = resp.get('PasswordPolicy') score = self.passwordPolicyScoring(policies) currVal = [] if score <= self.PASSWORD_POLICY_MIN_SCORE: for policy, num in policies.items(): currVal.append(f"{policy}={num}") output = '
'.join(currVal) self.results['passwordPolicyWeak'] = [-1, output] except botocore.exceptions.ClientError as e: ecode = e.response['Error']['Code'] print(ecode) if ecode == 'NoSuchEntity': self.results['passwordPolicy'] = [-1, ecode] def _checkRootLoginActivity(self): c = 0 LookupAttributes=[ { 'AttributeKey': 'Username', 'AttributeValue': 'root' }, { 'AttributeKey': 'Eventname', 'AttributeValue': 'ConsoleLogin' } ] - timedelta(days=30) + timedelta(days=1) resp = self.ctClient.lookup_events( LookupAttributes=LookupAttributes, StartTime=StartTime, EndTime=EndTime, MaxResults=50, ) ee = resp.get('Events') if len(ee) == 0: return self.results['rootConsoleLogin30days'] = [-1, ''] for e in ee: o = e.get('CloudTrailEvent') o = json.loads(o) if 'errorMessage' in o: c += 1 if c >= self.ROOT_LOGIN_MAX_COUNT: self.results['rootConsoleLoginFail3x'] = [-1, ''] return while resp.get('NextToken') != None: resp = self.ctClient.lookup_events( LookupAttributes=LookupAttributes, StartTime=StartTime, EndTime=EndTime, MaxResults=50, NextToken = resp.get('NextToken') ) ee = resp.get('Events') for e in ee: o = e.get('CloudTrailEvent') o = json.loads(o) if 'errorMessage' in o: c += 1 if c >= self.ROOT_LOGIN_MAX_COUNT: self.results['rootConsoleLoginFail3x'] = [-1, ''] return def _checkHasRole_AWSReservedSSO(self): hasReservedRole = False for role in self.roles: if role['RoleName'].startswith('AWSReservedSSO_'): hasReservedRole = True break if hasReservedRole == False: self.results['hasSSORoles'] = [-1, ''] def _checkHasExternalProvider(self): hasOpID = False hasSaml = False resp = self.iamClient.list_open_id_connect_providers() if 'OpenIDConnectProviderList' in resp: if len(resp['OpenIDConnectProviderList']) > 0: hasOpID = True resp = self.iamClient.list_saml_providers() if 'SAMLProviderList' in resp: if len(resp['SAMLProviderList']) > 0: hasSaml = True if hasOpID == False and hasSaml == False: self.results['hasExternalIdentityProvider'] = [-1, ''] def _checkHasGuardDuty(self): resp = self.gdClient.list_detectors() if 'DetectorIds' in resp: ids = resp.get('DetectorIds') if len(ids) > 0: return self.results["enableGuardDuty"] = [-1, ""] def _checkHasCostBudget(self): stsInfo = Config.get('stsInfo') budgetClient = self.budgetClient try: resp = budgetClient.describe_budgets(AccountId=stsInfo['Account']) if 'Budgets' in resp: return self.results['enableCostBudget'] = [-1, ""] except botocore.exceptions.ClientError as e: ecode = e.response['Error']['Code'] emsg = e.response['Error']['Message'] print(ecode, emsg) def _checkSupportPlan(self): sppClient = self.sppClient try: resp = sppClient.describe_severity_levels() except botocore.exceptions.ClientError as e: ecode = e.response['Error']['Code'] if ecode == 'SubscriptionRequiredException': self.results['supportPlanLowTier'] = [-1, ''] def _checkHasUsers(self): # has at least 1 for all account (root) if self.noOfUsers < 2: self.results['noUsersFound'] = [-1, 'No IAM User found'] def _checkHasAlternateContact(self): CONTACT_TYP = ['BILLING', 'SECURITY', 'OPERATIONS'] cnt = 0 for typ in CONTACT_TYP: res = self.getAlternateContactByType(typ) if res == None: res = 0 cnt += res if cnt == 0: self.results['hasAlternateContact'] = [-1, 'No alternate contacts'] def getAlternateContactByType(self, typ): try: resp = self.accClient.get_alternate_contact( AlternateContactType = typ ) return 1 except botocore.exceptions.ClientError as e: ecode = e.response['Error']['Code'] if ecode == 'ResourceNotFoundException': return 0 def _checkHasOrganization(self): try: resp = self.orgClient.describe_organization() except botocore.exceptions.ClientError as e: ecode = e.response['Error']['Code'] if ecode == 'AWSOrganizationsNotInUseException': self.results['hasOrganization'] = [-1, ''] return 0