QuickSight Data APIs to build data sources and datasets

Author: Ying Wang (Sr.Data Visualization Engineer in ProServe GSP)
Date: April 16 2020

Author: Vamsi Bhadriraju (Data Architect in ProServe) 
Revision Date : March 23 2021
Revision Date : May 27 2021

Author: Ian Liao (Data Visualization Engineer in ProServe GSP)
Date: Dec 3 2021


In [None]:
import boto3
import botocore
import json
import time
import os
from IPython.display import JSON
from IPython.display import IFrame
def display_json(doc, root='root'):
 return JSON(doc)
#qs = boto3.client('quicksight')
import logging
from typing import Any, Dict, List, Optional

def default_botocore_config() -> botocore.config.Config:
 """Botocore configuration."""
 retries_config: Dict[str, Union[str, int]] = {
 "max_attempts": int(os.getenv("AWS_MAX_ATTEMPTS", "5")),
 }
 mode: Optional[str] = os.getenv("AWS_RETRY_MODE")
 if mode:
 retries_config["mode"] = mode
 return botocore.config.Config(
 retries=retries_config,
 connect_timeout=10,
 max_pool_connections=10,
 user_agent_extra=f"qs_sdk_BIOps",
 )

construct target information 

In [None]:
def _assume_role(aws_account_number, role_name, aws_region):
 sts_client = boto3.client('sts', config=default_botocore_config())
 response = sts_client.assume_role(
 RoleArn='arn:aws:iam::' + aws_account_number + ':role/' + role_name,
 RoleSessionName='quicksight'
 )
 # Storing STS credentials
 session = boto3.Session(
 aws_access_key_id=response['Credentials']['AccessKeyId'],
 aws_secret_access_key=response['Credentials']['SecretAccessKey'],
 aws_session_token=response['Credentials']['SessionToken'],
 region_name=aws_region
 )

 #logger.info("Assumed session for " + aws_account_number + " in region " + aws_region + ".")

 return session


In [None]:
def get_user_arn (session, username, region='us-east-1', namespace='default'): 
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 if username=='root':
 arn='arn:aws:iam::'+account_id+':'+username
 else:
 arn="arn:aws:quicksight:"+region+":"+account_id+":user/"+namespace+"/"+username
 
 return arn

In [None]:
def get_target(targetsession, rds,redshift,s3Bucket,s3Key,vpc,tag,targetadmin,rdscredential,redshiftcredential,region='us-east-1', namespace='default',version='1'): 
 sts_client = targetsession.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 target: Dict[str, Any] = {
 "rds": {"rdsinstanceid": ''},
"s3":{"manifestBucket": '',
"manifestkey": ''},
"vpc": '',
"tag": '',
 "credential": {"rdscredential": rdscredential,
 "redshiftcredential": redshiftcredential},
 "datasourcepermission": [
 {'Principal': targetadmin,
 'Actions': ["quicksight:DescribeDataSource",
 "quicksight:DescribeDataSourcePermissions",
 "quicksight:PassDataSource",
 "quicksight:UpdateDataSource",
 "quicksight:DeleteDataSource",
 "quicksight:UpdateDataSourcePermissions"]
 }
 ],
 "datasetpermission": [{'Principal': targetadmin,
 'Actions': ['quicksight:UpdateDataSetPermissions',
 'quicksight:DescribeDataSet',
 'quicksight:DescribeDataSetPermissions',
 'quicksight:PassDataSet',
 'quicksight:DescribeIngestion',
 'quicksight:ListIngestions',
 'quicksight:UpdateDataSet',
 'quicksight:DeleteDataSet',
 'quicksight:CreateIngestion',
 'quicksight:CancelIngestion']}],
 "version": '1', 
 }
 
 if rds:
 target["rds"]["rdsinstanceid"] = rds
 
 if redshift:
 target["redshift"] = redshift
 
 if s3Bucket:
 target["s3"]["manifestBucket"] = s3Bucket
 target["s3"]["manifestkey"]=s3Key
 
 if vpc:
 target["vpc"] = "arn:aws:quicksight:"+region+":"+account_id+":vpcConnection/"+vpc
 
 if tag:
 target["tag"]=tag
 
 if rdscredential:
 target["credential"]["rdscredential"]=rdscredential
 
 if redshiftcredential:
 target["credential"]["redshiftcredential"]=redshiftcredential
 
 return target



In [None]:
#get datasets of dashboards in deployment list
def data_sets_ls_of_dashboard(dashboard, sourcesession):
 dashboardid=get_dashboard_ids(dashboard, sourcesession)
 print(dashboardid)
 sourcedashboard=describe_dashboard(sourcesession, dashboardid[0])
 DataSetArns=sourcedashboard['Dashboard']['Version']['DataSetArns']
 sourcedsref = []
 for i in DataSetArns:
 missing=False
 did = i.split("/")[1]
 #print(did)
 try:
 dname=get_dataset_name(did, sourcesession)
 except Exception as e:
 faillist.append({"Error Type": "Dataset: "+did+" is missing!","DashboardId": dashboardid[0], "Name": dashboard, "Error": str(e)})
 missing=True
 break
 
 sourcedsref.append(dname)
 return sourcedsref

In [None]:
#get datasets of analysis in deployment list
def data_sets_ls_of_analysis(analysis, sourcesession):
 analysisid=get_analysis_ids(analysis, sourcesession)
 sourceanalysis=describe_analysis(sourcesession, analysisid[0])
 DataSetArns=sourceanalysis['Analysis']['DataSetArns']
 sourcedsref = []
 for i in DataSetArns:
 missing=False
 did = i.split("/")[1]
 #print(did)
 try:
 dname=get_dataset_name(did, sourcesession)
 except Exception as e:
 faillist.append({"Error Type": "Dataset: "+did+" is missing!","AnalysisId": analysisid[0], "Name": analysis, "Error": str(e)})
 missing=True
 break
 
 sourcedsref.append(dname)
 return sourcedsref

In [None]:
#get data sources of dashboards in deployment list
def data_sources_ls_of_dashboard(dashboard, sourcesession):
 datasets=data_sets_ls_of_dashboard(dashboard, sourcesession)
 sourcedsref = []
 for dataset in datasets:
 ids = get_dataset_ids(dataset, sourcesession)
 res=describe_dataset(sourcesession, ids[0])

 PT=res['DataSet']['PhysicalTableMap']
 for key, value in PT.items():
 for i,j in value.items():
 dsid = j['DataSourceArn'].split("/")[1]
 dsname=get_datasource_name(dsid, sourcesession)
 if dsname not in sourcedsref:
 sourcedsref.append(dsname)
 return sourcedsref

In [None]:
#get data sources of analysis in deployment list
def data_sources_ls_of_analysis(analysis, sourcesession):
 datasets=data_sets_ls_of_analysis(analysis, sourcesession)
 sourcedsref = []
 for dataset in datasets:
 ids = get_dataset_ids(dataset, sourcesession)
 res=describe_dataset(sourcesession, ids[0])

 PT=res['DataSet']['PhysicalTableMap']
 for key, value in PT.items():
 for i,j in value.items():
 dsid = j['DataSourceArn'].split("/")[1]
 dsname=get_datasource_name(dsid, sourcesession)
 if dsname not in sourcedsref:
 sourcedsref.append(dsname)
 return sourcedsref

In [None]:
def get_data_source_deployment_list(sourcesession,source_deployment_list):
 datasources=data_sources(sourcesession) #get data source details with listdatasource API
 
 deployment_list=[]
 for newsource in source_deployment_list:
 ids = get_datasource_ids(newsource, sourcesession) #Get id of data sources deployment list
 for datasource in datasources:
 if ids[0] == datasource["DataSourceId"]:
 deployment_list.append(datasource) #deployment_list is an array containing data source connection information and etc

 return deployment_list
 

get object ids from name, or get object name from id 

In [None]:
# 
def get_dashboard_ids(name: str, session) -> List[str]:
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 
 ids: List[str] = []
 for dashboard in dashboards(session):
 if dashboard["Name"] == name:
 ids.append(dashboard["DashboardId"])
 return ids

def get_analysis_ids(name: str, session) -> List[str]:
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 
 ids: List[str] = []
 for analysis_list in analysis(session):
 if analysis_list["Name"] == name:
 ids.append(analysis_list["AnalysisId"])
 return ids

def get_dashboard_name(did: str, session) -> List[str]:
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 
 name: str
 for dashboard in dashboards(session):
 if dashboard["DashboardId"] == did:
 name=dashboard["Name"]
 return name



def get_dataset_name(did: str, session) -> List[str]:
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 
 name: str
 for dataset in data_sets(session):
 if dataset["DataSetId"] == did:
 name=dataset["Name"]
 return name

def get_dataset_ids(name: str, session) -> List[str]:
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 
 ids: List[str] = []
 for dataset in data_sets(session):
 if dataset["Name"] == name:
 ids.append(dataset["DataSetId"])
 return ids

def get_datasource_name(did: str, session) -> List[str]:
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 
 name: str
 for datasource in data_sources(session):
 if datasource["DataSourceId"] == did:
 name=datasource["Name"]
 return name

def get_datasource_ids(name: str, session) -> List[str]:
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 
 ids: List[str] = []
 for datasource in data_sources(session):
 if datasource["Name"] == name:
 ids.append(datasource["DataSourceId"])
 return ids

List current Data sources/datasets

In [None]:
#
def data_sources (session):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 datasources = []
 response = qs.list_data_sources(AwsAccountId=account_id)
 next_token: str = response.get("NextToken", None)
 datasources += response["DataSources"]
 while next_token is not None:
 response = qs.list_data_sources(AwsAccountId=account_id, NextToken=next_token)
 next_token = response.get("NextToken", None)
 datasources += response["DataSources"]
 return datasources
 
def data_sets (session):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 datasets = []
 response = qs.list_data_sets(AwsAccountId=account_id)
 next_token: str = response.get("NextToken", None)
 datasets += response["DataSetSummaries"]
 while next_token is not None:
 response = qs.list_data_sets(AwsAccountId=account_id, NextToken=next_token)
 next_token = response.get("NextToken", None)
 datasets += response["DataSetSummaries"]
 return datasets

def templates (session):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 token=None
 
 args: Dict[str, Any] = {
 "AwsAccountId": account_id,
 "MaxResults": 100,
 }
 
 tlist=qs.list_templates(**args)

 templates=tlist['TemplateSummaryList']
 
 if 'NextToken' in tlist:
 token=tlist['NextToken']
 while token is not None:
 args["NextToken"] = token
 tlist=qs.list_templates(**args)
 templates.append(tlist['TemplateSummaryList'])
 token = tlist.get("NextToken", None) 
 else: pass
 #print('token is none. No further action.')

 return templates

def dashboards(session)-> List[Dict[str, Any]]:
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 dashboards = []
 response = qs.list_dashboards(AwsAccountId=account_id)
 next_token: str = response.get("NextToken", None)
 dashboards += response["DashboardSummaryList"]
 while next_token is not None:
 response = qs.list_dashboards(AwsAccountId=account_id, NextToken=next_token)
 next_token = response.get("NextToken", None)
 dashboards += response["DashboardSummaryList"]
 return dashboards

def analysis (session):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 analysis = []
 response = qs.list_analyses(AwsAccountId=account_id)
 next_token: str = response.get("NextToken", None)
 analysis += response["AnalysisSummaryList"]
 while next_token is not None:
 response = qs.list_analyses(AwsAccountId=account_id, NextToken=next_token)
 next_token = response.get("NextToken", None)
 analysis += response["AnalysisSummaryList"]
 return analysis

def themes (session):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 themes = []
 response = qs.list_themes(AwsAccountId=account_id)
 next_token: str = response.get("NextToken", None)
 themes += response["ThemeSummaryList"]
 while next_token is not None:
 response = qs.list_themes(AwsAccountId=account_id, NextToken=next_token)
 next_token = response.get("NextToken", None)
 themes += response["ThemeSummaryList"]
 return themes

def folder_members (session, folderid):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 members = []
 response = qs.list_folder_members(AwsAccountId=account_id, FolderId = folderid)
 next_token: str = response.get("NextToken", None)
 members += response["FolderMemberList"]
 while next_token is not None:
 response = qs.list_folder_members(AwsAccountId=account_id, FolderId = folderid, NextToken=next_token)
 next_token = response.get("NextToken", None)
 members += response["FolderMemberList"]
 return members

Describe a Data source/dataset

In [None]:
#
def describe_source (session, DSID):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 response = qs.describe_data_source(
 AwsAccountId=AccountId,
 DataSourceId=DSID)
 return response


#Describe a Dataset
def describe_dataset (session, DSID):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 response = qs.describe_data_set(
 AwsAccountId=AccountId,
 DataSetId=DSID)
 return response

def describe_dashboard(session, dashboard):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 try:
 response = qs.describe_dashboard(
 AwsAccountId=account_id,
 DashboardId=dashboard)
 except Exception as e:
 return ('Faild to describe dashboard: '+str(e))
 else: return response

def describe_template(session,tid):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.describe_template(
 AwsAccountId=account_id,
 TemplateId=tid)
 return response

def describe_analysis(session, analysis):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 try:
 response = qs.describe_analysis(
 AwsAccountId=account_id,
 AnalysisId=analysis)
 except Exception as e:
 return ('Faild to describe analysis: '+str(e))
 else: return response
 
def describe_theme (session, THEMEID):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 response = qs.describe_theme(
 AwsAccountId=AccountId,
 ThemeId=THEMEID)
 return response

delete object 

In [None]:
def delete_source (session, DataSourceId):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 delsource = qs.delete_data_source(
 AwsAccountId=AccountId,
 DataSourceId=DataSourceId)
 return delsource

def delete_dataset (session, DataSetId):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 response = qs.delete_data_set(
 AwsAccountId=AccountId,
 DataSetId=DataSetId)
 return response

def delete_template (session, tid, version=None):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 args: Dict[str, Any] = {
 "AwsAccountId": AccountId,
 "TemplateId": tid,
 }
 if version:
 args["VersionNumber"] = version
 
 response = qs.delete_template(**args)
 return response

def delete_dashboard(session, did):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.delete_dashboard(
 AwsAccountId=account_id,
 DashboardId=did)
 return response

def delete_analysis(session, did):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.delete_analysis(
 AwsAccountId=account_id,
 AnalysisId=did)
 return response

def delete_theme (session, THEMEID):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 response = qs.delete_theme(
 AwsAccountId=AccountId,
 ThemeId=THEMEID)
 return response

 Create Objects 

In [None]:
def create_data_source(source, session, target):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 credential=None

 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/quicksight.html#QuickSight.Client.create_data_source
 conn_dict = {
 "aurora": "AuroraParameters",
 "aurora_postgresql": "AuroraPostgreSqlParameters",
 "mariadb": "MariaDbParameters",
 "mysql": "MySqlParameters",
 "postgresql": "PostgreSqlParameters",
 "sqlserver": "SqlServerParameters"
 }

 #rds
 if source['Type'].lower() in [
 'aurora', 'aurora_postgresql', 'mariadb', 'mysql', 'postgresql',
 'sqlserver'] and 'RdsParameters' in source['DataSourceParameters']:
 #Update data source instance name
 instance_id = source['DataSourceParameters']['RdsParameters']
 instance_id['InstanceId'] = target['rds']['rdsinstanceid']
 credential = target['credential']['rdscredential']
 elif source['Type'].lower() in [
 'aurora', 'aurora_postgresql', 'mariadb', 'mysql', 'postgresql',
 'sqlserver'] and conn_dict.get(source['Type'].lower()) in source['DataSourceParameters']:
 #Update data source parameters
 conn_name = conn_dict.get(source['Type'].lower())
 conn_params = source['DataSourceParameters'][conn_name]
 conn_params['Host'] = target['rds']['rdsinstanceid']
 credential = target['credential']['rdscredential']
 
 
 #redshift
 if source['Type']=="REDSHIFT":
 
 #Update data source instance name
 Cluster=source['DataSourceParameters']['RedshiftParameters']
 if 'ClusterId' in Cluster:
 Cluster['ClusterId'] = target['redshift']['ClusterId']
 Cluster['Host'] = target['redshift']['Host']
 if target['redshift']['Database'] is not None and 'Database' in Cluster:
 Cluster['Database'] = target['redshift']['Database']
 credential=target['credential']['redshiftcredential']
 
 if 'VpcConnectionProperties' in source and target['vpc'] is not None:
 source['VpcConnectionProperties']['VpcConnectionArn'] = target['vpc']
 elif 'VpcConnectionProperties' in source and target['vpc'] is None:
 raise Exception("Sorry, you need the targetvpc information")

 args: Dict[str, Any] = {
 "AwsAccountId": account_id,
 "DataSourceId": source['DataSourceId'],
 "Name": source['Name'],
 "Type": source['Type'],
 }
 
 if "SslProperties" in source: 
 args["SslProperties"] = source['SslProperties']
 
 if 'DataSourceParameters' in source:
 args["DataSourceParameters"] = source['DataSourceParameters']
 
 if target['tag'] is not None:
 args["Tags"] = target['tag']
 
 if credential is not None:
 args["Credentials"] = credential
 
 if 'VpcConnectionProperties' in source:
 args["VpcConnectionProperties"] = source['VpcConnectionProperties']
 
 args["Permissions"] = target['datasourcepermission']
 
 try:
 NewSource = qs.create_data_source(**args)
 return NewSource
 except Exception as e:
 error={"DataSource": args, "Error": str(e)}
 return error

In [None]:
def loaddsinput (file, part):
 import json
 with open(file) as f:
 data = json.load(f)
 
 res=data['DataSet'][part]
 
 return res




In [None]:
#AccountId string; DataSetId string; Name string; Physical: json; Logical: json; Mode: string;
#ColumnGroups: json array; Permissions: json array; RLS: json; Tags: json array
def create_dataset (session, DataSetId, Name, Physical, Logical, Mode, Permissions, ColumnGroups=None, RowLevelPermissionDataSet=None):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 args: Dict[str, Any] = {
 "AwsAccountId": AccountId,
 "DataSetId": DataSetId,
 "Name": Name,
 "PhysicalTableMap": Physical,
 "LogicalTableMap": Logical,
 "ImportMode": Mode,
 "Permissions": Permissions,
 }
 if ColumnGroups:
 args["ColumnGroups"] = ColumnGroups
 if RowLevelPermissionDataSet:
 args["RowLevelPermissionDataSet"] = RowLevelPermissionDataSet
 
 response = qs.create_data_set(**args)
 return response

In [None]:
def create_template(session, TemplateId, tname, dsref, sourceanalysis, version):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 try:
 delete_template(session, TemplateId)
 except Exception:
 print(traceback.format_exc())
 #assert isinstance(TemplateId, int), ''
 finally:
 response = qs.create_template(
 AwsAccountId=account_id, 
 TemplateId=TemplateId,
 Name=tname,
 SourceEntity={
 'SourceAnalysis': {
 'Arn': sourceanalysis,
 'DataSetReferences': dsref
 }
 },
 VersionDescription=version
 )
 return response

#res=update_template_permission('810061268089', 'covid19dashboard', 'arn:aws:iam::889399602426:root')
 
def copy_template(session, TemplateId, tname, SourceTemplatearn):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 try:
 delete_template(session, TemplateId)
 except Exception:
 print(traceback.format_exc())
 #assert isinstance(TemplateId, int), ''
 finally:
 response = qs.create_template(
 AwsAccountId=account_id, 
 TemplateId=TemplateId,
 Name=tname,
 SourceEntity={
 'SourceTemplate': {
 'Arn': SourceTemplatearn
 }
 },
 VersionDescription='1'
 )
 return response

def create_dashboard(session, dashboard, name,principal, SourceEntity, version,themearn, filter='ENABLED',csv='ENABLED', sheetcontrol='EXPANDED'):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 if themearn=='':
 response = qs.create_dashboard(
 AwsAccountId=account_id,
 DashboardId=dashboard,
 Name=name,
 Permissions=[
 {
 'Principal': principal,
 'Actions': [
 'quicksight:DescribeDashboard',
 'quicksight:ListDashboardVersions',
 'quicksight:UpdateDashboardPermissions',
 'quicksight:QueryDashboard',
 'quicksight:UpdateDashboard',
 'quicksight:DeleteDashboard',
 'quicksight:DescribeDashboardPermissions',
 'quicksight:UpdateDashboardPublishedVersion' 
 ]
 },
 ],
 SourceEntity=SourceEntity,
 VersionDescription=version,
 DashboardPublishOptions={
 'AdHocFilteringOption': {
 'AvailabilityStatus': filter
 },
 'ExportToCSVOption': {
 'AvailabilityStatus': csv
 },
 'SheetControlsOption': {
 'VisibilityState': sheetcontrol
 }
 }
 )
 else:
 response = qs.create_dashboard(
 AwsAccountId=account_id,
 DashboardId=dashboard,
 Name=name,
 Permissions=[
 {
 'Principal': principal,
 'Actions': [
 'quicksight:DescribeDashboard',
 'quicksight:ListDashboardVersions',
 'quicksight:UpdateDashboardPermissions',
 'quicksight:QueryDashboard',
 'quicksight:UpdateDashboard',
 'quicksight:DeleteDashboard',
 'quicksight:DescribeDashboardPermissions',
 'quicksight:UpdateDashboardPublishedVersion' 
 ]
 },
 ],
 SourceEntity=SourceEntity,
 VersionDescription=version,
 DashboardPublishOptions={
 'AdHocFilteringOption': {
 'AvailabilityStatus': filter
 },
 'ExportToCSVOption': {
 'AvailabilityStatus': csv
 },
 'SheetControlsOption': {
 'VisibilityState': sheetcontrol
 }
 },
 ThemeArn=themearn
 )
 
 return response


def create_analysis(session, analysis, name,principal, SourceEntity,ThemeArn):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 if ThemeArn != '':
 response = qs.create_analysis(
 AwsAccountId=account_id,
 AnalysisId=analysis,
 Name=name,
 Permissions=[
 {
 'Principal': principal,
 'Actions': [
 'quicksight:RestoreAnalysis',
 'quicksight:UpdateAnalysisPermissions',
 'quicksight:DeleteAnalysis',
 'quicksight:QueryAnalysis',
 'quicksight:DescribeAnalysisPermissions',
 'quicksight:DescribeAnalysis',
 'quicksight:UpdateAnalysis'
 
 ]
 }
 ],
 SourceEntity=SourceEntity,
 ThemeArn=ThemeArn
 )
 else:
 response = qs.create_analysis(
 AwsAccountId=account_id,
 AnalysisId=analysis,
 Name=name,
 Permissions=[
 {
 'Principal': principal,
 'Actions': [
 'quicksight:RestoreAnalysis',
 'quicksight:UpdateAnalysisPermissions',
 'quicksight:DeleteAnalysis',
 'quicksight:QueryAnalysis',
 'quicksight:DescribeAnalysisPermissions',
 'quicksight:DescribeAnalysis',
 'quicksight:UpdateAnalysis'
 
 ]
 }
 ],
 SourceEntity=SourceEntity
 )

 return response

def create_theme (session,THEMEID, Name,BaseThemeId,Configuration):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 response = qs.create_theme(
 AwsAccountId=AccountId,
 ThemeId=THEMEID,
 Name=Name,
 BaseThemeId=BaseThemeId,
 Configuration=Configuration
 )
 return response

In [None]:
def create_folder_membership(session, folderid, objectid, objecttype):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 response = qs.create_folder_membership(
 AwsAccountId = AccountId,
 FolderId=folderid,
 MemberId=objectid,
 MemberType=objecttype
 )
 return response

 Update Objects 

In [None]:
#AccountId string; DataSetId string; Name string; Physical: json; Logical: json; Mode: string;
#ColumnGroups: json array; Permissions: json array; RLS: json; Tags: json array
def update_dataset (session, DataSetId, Name, Physical, Logical, Mode, ColumnGroups=None, RowLevelPermissionDataSet=None):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 AccountId = sts_client.get_caller_identity()["Account"]
 args: Dict[str, Any] = {
 "AwsAccountId": AccountId,
 "DataSetId": DataSetId,
 "Name": Name,
 "PhysicalTableMap": Physical,
 "LogicalTableMap": Logical,
 "ImportMode": Mode,
 }
 if ColumnGroups:
 args["ColumnGroups"] = ColumnGroups
 if RowLevelPermissionDataSet:
 args["RowLevelPermissionDataSet"] = RowLevelPermissionDataSet
 
 response = qs.update_data_set(**args)
 return response

In [None]:
def update_template_permission(session, TemplateId, Principal):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.update_template_permissions(
 AwsAccountId=account_id,
 TemplateId=TemplateId,
 GrantPermissions=[
 {
 'Principal': Principal,
 'Actions': [
 'quicksight:DescribeTemplate',
 ]
 }
 ]
 )
 return response

def update_dashboard(session, dashboard, name, SourceEntity, version, filter='ENABLED',csv='ENABLED', sheetcontrol='EXPANDED'):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]

 response = qs.update_dashboard(
 AwsAccountId=account_id,
 DashboardId=dashboard,
 Name=name,
 SourceEntity=SourceEntity,
 VersionDescription=version,
 DashboardPublishOptions={
 'AdHocFilteringOption': {
 'AvailabilityStatus': filter
 },
 'ExportToCSVOption': {
 'AvailabilityStatus': csv
 },
 'SheetControlsOption': {
 'VisibilityState': sheetcontrol
 }
 })
 
 return response

def update_data_source_permissions(session, datasourceid, Principal):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.update_data_source_permissions(
 AwsAccountId=account_id,
 DataSourceId=datasourceid,
 GrantPermissions=[
 {
 'Principal': Principal,
 'Actions':["quicksight:DescribeDataSource",
 "quicksight:DescribeDataSourcePermissions",
 "quicksight:PassDataSource",
 "quicksight:UpdateDataSource",
 "quicksight:DeleteDataSource",
 "quicksight:UpdateDataSourcePermissions"]
 },
 ]
 )
 return response

def update_data_set_permissions(session, datasetid, Principal):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.update_data_set_permissions(
 AwsAccountId=account_id,
 DataSetId=datasetid,
 GrantPermissions=[
 {
 'Principal': Principal,
 'Actions':['quicksight:UpdateDataSetPermissions',
 'quicksight:DescribeDataSet',
 'quicksight:DescribeDataSetPermissions',
 'quicksight:PassDataSet',
 'quicksight:DescribeIngestion',
 'quicksight:ListIngestions',
 'quicksight:UpdateDataSet',
 'quicksight:DeleteDataSet',
 'quicksight:CreateIngestion',
 'quicksight:CancelIngestion']
 },
 ]
 )
 return response


def update_analysis(session, analysis, name, SourceEntity,themearn):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]

 if themearn == '':
 response = qs.update_analysis(
 AwsAccountId=account_id,
 AnalysisId=analysis,
 Name=name,
 SourceEntity=SourceEntity
 )
 else:
 response = qs.update_analysis(
 AwsAccountId=account_id,
 AnalysisId=analysis,
 Name=name,
 SourceEntity=SourceEntity,
 ThemeArn=themearn
 ) 
 return response


def describe_analysis_permissions(session, analysis):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 try:
 response = qs.describe_analysis_permissions(
 AwsAccountId=account_id,
 AnalysisId=analysis)
 except Exception as e:
 return ('Faild to describe analysis: '+str(e))
 else: return response
 
def update_theme(session, THEMEID, name,BaseThemeId):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]

 response = qs.update_theme(
 AwsAccountId=account_id,
 ThemeId=THEMEID,
 Name=name,
 BaseThemeId=BaseThemeId
 )
 return response

def describe_theme_permissions(session, THEMEID):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]

 response = qs.describe_theme_permissions(
 AwsAccountId=account_id,
 ThemeId=THEMEID
 )
 return response

def update_theme_permissions(session, THEMEID, Principal):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.update_theme_permissions(
 AwsAccountId=account_id,
 ThemeId=THEMEID,
 GrantPermissions=[
 {
 'Principal':Principal,
 'Actions':['quicksight:ListThemeVersions',
 'quicksight:UpdateThemeAlias',
 'quicksight:UpdateThemePermissions',
 'quicksight:DescribeThemeAlias',
 'quicksight:DeleteThemeAlias',
 'quicksight:DeleteTheme',
 'quicksight:ListThemeAliases',
 'quicksight:DescribeTheme',
 'quicksight:CreateThemeAlias',
 'quicksight:UpdateTheme',
 'quicksight:DescribeThemePermissions']
 }
 ]
 )
 return response

In [None]:
def update_template_permission(session, TemplateId, Principal):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.update_template_permissions(
 AwsAccountId=account_id,
 TemplateId=TemplateId,
 GrantPermissions=[
 {
 'Principal': Principal,
 'Actions': [
 'quicksight:DescribeTemplate',
 ]
 }
 ]
 )
 return response

def update_dashboard(session, dashboard, name, SourceEntity, version, filter='ENABLED',csv='ENABLED', sheetcontrol='EXPANDED'):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]

 response = qs.update_dashboard(
 AwsAccountId=account_id,
 DashboardId=dashboard,
 Name=name,
 SourceEntity=SourceEntity,
 VersionDescription=version,
 DashboardPublishOptions={
 'AdHocFilteringOption': {
 'AvailabilityStatus': filter
 },
 'ExportToCSVOption': {
 'AvailabilityStatus': csv
 },
 'SheetControlsOption': {
 'VisibilityState': sheetcontrol
 }
 })
 
 return response

def update_data_source_permissions(session, datasourceid, Principal):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.update_data_source_permissions(
 AwsAccountId=account_id,
 DataSourceId=datasourceid,
 GrantPermissions=[
 {
 'Principal': Principal,
 'Actions':["quicksight:DescribeDataSource",
 "quicksight:DescribeDataSourcePermissions",
 "quicksight:PassDataSource",
 "quicksight:UpdateDataSource",
 "quicksight:DeleteDataSource",
 "quicksight:UpdateDataSourcePermissions"]
 },
 ]
 )
 return response

def update_data_set_permissions(session, datasetid, Principal):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.update_data_set_permissions(
 AwsAccountId=account_id,
 DataSetId=datasetid,
 GrantPermissions=[
 {
 'Principal': Principal,
 'Actions':['quicksight:UpdateDataSetPermissions',
 'quicksight:DescribeDataSet',
 'quicksight:DescribeDataSetPermissions',
 'quicksight:PassDataSet',
 'quicksight:DescribeIngestion',
 'quicksight:ListIngestions',
 'quicksight:UpdateDataSet',
 'quicksight:DeleteDataSet',
 'quicksight:CreateIngestion',
 'quicksight:CancelIngestion']
 },
 ]
 )
 return response


def update_analysis(session, analysis, name, SourceEntity,themearn):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]

 if themearn == '':
 response = qs.update_analysis(
 AwsAccountId=account_id,
 AnalysisId=analysis,
 Name=name,
 SourceEntity=SourceEntity
 )
 else:
 response = qs.update_analysis(
 AwsAccountId=account_id,
 AnalysisId=analysis,
 Name=name,
 SourceEntity=SourceEntity,
 ThemeArn=themearn
 ) 
 return response


def describe_analysis_permissions(session, analysis):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 try:
 response = qs.describe_analysis_permissions(
 AwsAccountId=account_id,
 AnalysisId=analysis)
 except Exception as e:
 return ('Faild to describe analysis: '+str(e))
 else: return response
 
def update_theme(session, THEMEID, name,BaseThemeId):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]

 response = qs.update_theme(
 AwsAccountId=account_id,
 ThemeId=THEMEID,
 Name=name,
 BaseThemeId=BaseThemeId
 )
 return response

def describe_theme_permissions(session, THEMEID):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]

 response = qs.describe_theme_permissions(
 AwsAccountId=account_id,
 ThemeId=THEMEID
 )
 return response

def update_theme_permissions(session, THEMEID, Principal):
 qs = session.client('quicksight')
 sts_client = session.client("sts")
 account_id = sts_client.get_caller_identity()["Account"]
 response = qs.update_theme_permissions(
 AwsAccountId=account_id,
 ThemeId=THEMEID,
 GrantPermissions=[
 {
 'Principal':Principal,
 'Actions':['quicksight:ListThemeVersions',
 'quicksight:UpdateThemeAlias',
 'quicksight:UpdateThemePermissions',
 'quicksight:DescribeThemeAlias',
 'quicksight:DeleteThemeAlias',
 'quicksight:DeleteTheme',
 'quicksight:ListThemeAliases',
 'quicksight:DescribeTheme',
 'quicksight:CreateThemeAlias',
 'quicksight:UpdateTheme',
 'quicksight:DescribeThemePermissions']
 }
 ]
 )
 return response

In [None]:
print('done')