import boto3
import botocore
import json
import time
from utils.Config import Config
from utils.Tools import _pr
from services.Service import Service
from services.iam.drivers.IamRole import IamRole
from services.iam.drivers.IamGroup import IamGroup
from services.iam.drivers.IamUser import IamUser
from services.iam.drivers.IamAccount import IamAccount
class Iam(Service):
def __init__(self, region):
super().__init__(region)
self.iamClient = boto3.client('iam', config=self.bConfig)
self.awsClients = {
'iamClient': self.iamClient,
'orgClient': boto3.client('organizations'),
'accClient': boto3.client('account', config=self.bConfig),
'sppClient': boto3.client('support', config=self.bConfig),
'gdClient': boto3.client('guardduty', config=self.bConfig),
'budgetClient': boto3.client('budgets', config=self.bConfig)
}
def getGroups(self):
arr = []
results = self.iamClient.list_groups()
arr = results.get('Groups')
while results.get('Marker') is not None:
results = self.iamClient.list_groups(
Marker = results.get('Marker')
)
arr = arr + results.get('Groups')
return arr
def getRoles(self):
arr = []
results = self.iamClient.list_roles()
for v in results.get('Roles'):
if (v['Path'] != '/service-role/' and v['Path'][0:18] != '/aws-service-role/') and (self._roleFilterByName(v['RoleName'])):
arr.append(v)
while results.get('Marker') is not None:
results = self.iamClient.list_roles(Marker=results.get('Marker'))
for v in results.get('Roles'):
if (v['Path'] != '/service-role/' and v['Path'][0:18] != '/aws-service-role/') and (self._roleFilterByName(v['RoleName'])):
arr.append(v)
return arr
def getUsers(self):
self.getUserFlag = True
arr = []
resp = self.iamClient.generate_credential_report()
time.sleep(5)
try:
results = self.iamClient.get_credential_report()
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'ReportNotPresent':
try:
resp = self.iamClient.generate_credential_report()
print('Generating IAM Credential Report...')
time.sleep(5)
results = self.iamClient.get_credential_report()
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'InvalidClientTokenId':
self.getUserFlag = False
print(" !!! UNABLE TO GET USERS INFORMATION, SKIP ALL IAM !!!")
return arr
rows = results.get('Content')
rows = rows.decode('UTF-8')
row = rows.split("\n")
fields = row[0].split(',')
del row[0]
for temp in row:
arr.append(dict(zip(fields, temp.split(','))))
return arr
def advise(self):
objs = {}
users = self.getUsers()
if self.getUserFlag == False:
return objs
for user in users:
print('... (IAM::User) inspecting ' + user['user'])
obj = IamUser(user, self.iamClient)
obj.run()
identifier = "root_id" if user['user'] == "" else user['user']
objs['User::' + identifier] = obj.getInfo()
del obj
roles = self.getRoles()
for role in roles:
print('... (IAM::Role) inspecting ' + role['RoleName'])
obj = IamRole(role, self.iamClient)
obj.run()
objs['Role::' + role['RoleName']] = obj.getInfo()
del obj
groups = self.getGroups()
for group in groups:
print('... (IAM::Group) inspecting ' + group['GroupName'])
obj = IamGroup(group, self.iamClient)
obj.run()
objs['Group::' + group['GroupName']] = obj.getInfo()
del obj
print('... (IAM:Account) inspecting')
obj = IamAccount(None, self.awsClients, users, roles)
obj.run()
objs['Account::Config'] = obj.getInfo()
return objs
def _roleFilterByName(self, rn):
keywords = [
'AmazonSSMRole',
'DO-NOT-DELETE',
'Isengard',
'AwsSecurityNacundaAudit',
'AwsSecurityAudit',
'GatedGarden',
'PVRE-SSMOnboarding',
'PVRE-Maintenance',
'InternalAuditInternal'
]
for kw in keywords:
if kw in rn:
return False
return True
if __name__ == "__main__":
Config.init()
o = Iam('us-east-1')
out = o.advise()
_pr(out)