import sys from awsglue.dynamicframe import DynamicFrame from pyspark.sql.functions import lit,col,expr,split from pyspark.sql.utils import AnalysisException from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from pyspark.sql import SQLContext from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType from awsglue.job import Job import ast import boto3 import json import datetime import time sc = SparkContext() glueContext = GlueContext(sc) accountid = '44026XXXXXX89' # Input your accountId region = 'us-west-2' logfolder = 'CloudTrail' # For the purpose this demonstration , we are limiting the logs to specific date year = '2020' month = '10' day = '17' pdp = "(partition_0 = '{}' and partition_1 = '{}' and partition_2 = '{}' and partition_3 = '{}' and partition_4 = '{}' and partition_5 = '{}')".format(accountid,logfolder,region,year,month,day) rawcloudtraildata = glueContext.create_dynamic_frame.from_catalog(database = "cloudtrailanalysis", table_name = 'cloudtrail_runinstancesawslogs',push_down_predicate = pdp) # Get QuickSight Metadata E.g. Users, Analysis, Dashboards, Datasets etc. qsmetadata = glueContext.create_dynamic_frame.from_catalog(database = "quicksightbionbi", table_name = 'metadata',region='us-west-2') maindataset = rawcloudtraildata.toDF() qsmetadatadf = qsmetadata.toDF() #maindataset.printSchema() # Select only specific fields from Cloudtrail which are are required for our analysis. finalQScloudtrail = maindataset.where("eventSource = 'quicksight.amazonaws.com'").select("eventTime","eventName","awsRegion","userIdentity.sessionContext.sessionIssuer.accountId","userIdentity.sessionContext.sessionIssuer.userName", "serviceEventDetails.eventResponseDetails.analysisDetails.analysisName","serviceEventDetails.eventResponseDetails.dashboardDetails.dashboardName") finalQScloudtrail = finalQScloudtrail.withColumn("date",expr("replace(substr(eventTime,1,10),'-','')")) #finalQScloudtrail.show(truncate = False) QScloudtrailfinals3 = "s3://quicksight-bionbi/aggregatedoutput" #finalQScloudtrail.show() finalQScloudtrail.write.mode("overwrite").format("parquet").partitionBy("date").save(QScloudtrailfinals3,header = 'true')