B 9cb@sddlZddlmZddlmZddlmZddlZddlZddlmZddl Z ddl Z ddlm Z ddl Z ddlZddlZeZe ZejZe ddZe jd ed Zejd ed Zejd ed ZeeeedZeZeZddZddZddZddZddZ dWddZ!ddZ"dXd d!Z#d"d#Z$d$d%Z%d&d'Z&d(d)Z'dYd*d+Z(dZd.d/Z)d0d1Z*d2d3Z+d4d5Z,d[d6d7Z-d\d8d9Z.d]dd?Z0d_d@dAZ1dBdCZ2d`dDdEZ3dadFdGZ4dbdHdIZ5dJdKZ6dLdMZ7dNdOZ8dPdQZ9dcdUdVZ:dS)dN)Session)get_execution_role) FeatureGroup)datetime)sleepstsAccounts3) region_name sagemaker) service_namer zsagemaker-featurestore-runtime) boto_sessionsagemaker_client%sagemaker_featurestore_runtime_clientcCsntj}tj|d}|jd|d}t|td}x8y$|d}tdt dWq2PYq2Xq2WdS)N)r r )r r )namesagemaker_sessionFeatureGroupStatusz"Waiting for Feature Group Deletion) boto3rr clientrfeature_store_sessiondescribegetprinttimer)feature_group_nameregionr r feature_groupstatusrF/root/Sagemaker-FeatureStore/Sagemaker-FeatureStore/utilities/Utils.py)_wait_for_feature_group_deletion_complete)s    r!cCstj}tj|d}|jd|d}t|td}|d}x,|dkrjtdt d|d}q@W|d krt d |j td |j d dS) N)r r )r r )rrrCreatingz"Waiting for Feature Group CreationrCreatedzFailed to create feature group z FeatureGroup z successfully created.) rrr rrrrrrrr RuntimeErrorr)rrr rrrrrr )_wait_for_feature_group_creation_complete9s     r%cCsbg}xXt|j|jD]F\}}d|i}|dkr6d|d<n|dkrHd|d<nd|d<||qW|S)N FeatureNamefloat64 Fractional FeatureTypeint64IntegralString)zipcolumnsdtypesappend)dffeature_definitionscolcol_typefeaturerrr _df_to_feature_defsIs  r6cCs4|dd}|dd}|dd}|dd}|S) N$_D_?_Q_&_A_#_H_)replace)in_str escaped_strrrr _escape_tag_charsYs     rBcCs4|dd}|dd}|dd}|dd}|S) Nr8r7r:r9r<r;r>r=)r?)r@ unescaped_strrrr _unescape_tag_chars`s     rDId UpdateTimeTc Cs||jkrtd|dS||jkr8td|dS|dkrLdtd}i}|dk rhdd|ii|d<|dk rg}x,|D] \} } | t| d} || q~W||d <tjf|||t|d |it d |} t |dS) Nzinvalid id column name: z invalid event time column name: zs3://z/offline-storeS3StorageConfigS3UriOfflineStoreConfig)KeyValueTagsEnableOnlineStore)FeatureGroupNameRecordIdentifierFeatureNameEventTimeFeatureNameFeatureDefinitionsOnlineStoreConfigRoleArn) r.rdefault_bucketitemsrBr0rcreate_feature_groupr6roler%) fg_namer1id_nameevent_time_nametagsonlines3_uri other_argstags_as_kv_arraykvcurr_kvresprrr create_fg_from_dfgs6   rdcCsRt|d}tj|d}|d}i}x(|D] }|d}|d}t|||<q*W|S)NFeatureGroupArn) ResourceArnrLrJrK)describe_feature_groupr list_tagsrD)rXfg_arnrc tags_kv_arrayr[kvr`rarrr get_tagss   rlcCs&|dkrt}n tj|d}|dS)N) NameContainsFeatureGroupSummaries)rlist_feature_groups) name_containsrcrrr ros  rocCs tj|dS)N)rN)rrg)rXrrr rgsrgc Csd}d}i}yt|d}Wnd}|S|dd}|dd}d |d d d}|d td td |}|d d }d|dtd|dS)NTrIFDataCatalogConfig TableNamerGrH/z /sagemaker/z/offline-store/z-https://s3.console.aws.amazon.com/s3/buckets/z?region=z&prefix=z/data/)rgjoinsplit account_idr) rX fg_s3_urihas_offline_storeoffline_store_configtable base_s3_uribase_offline_prefixoffline_prefixs3_bucket_namerrr get_offline_store_urls  rcCs<t|dd}|d}|d}dtdtd|d|S) NrIrrrsDatabasez0https://console.aws.amazon.com/glue/home?region=z#table:catalog=z;name=z ;namespace=)rgrry)rX_data_catalog_config_table _databaserrr get_glue_table_urlsrc Csd}d}i}yt|d}Wnd}|S|dd}|ddd d }|d|d d}td }|j||d }d} x&|d D]} | d} d| kr| } PqW| d d} |t| | | S)NrqTrIFrGrHzs3://rtrr )BucketPrefixContentsrJz.parquet)rgrxr?rrlist_objects_v2 download_filerT) rXrzr{r|r~bucketprefix s3_clientrckey_to_downloadobj_keycurr_keyfilenamerrr download_sample_offline_files,  rcCs*d}yt|dWnd}YnX|rbt|d}|dsb|dd}|dd}|dd}|r|rt|dd d }d |d d d}|d td td|} |d d} td} | | } | jj | d} t d| d| | }d}yt j |d}Wn YnXt|dS)NTrIFDisableGlueTableCreationrrrsCatalogrrGrHrtruz /sagemaker/z/offline-store/rvr )rz#Deleting all s3 objects in prefix: z in bucket )rN)rgrwrxryrrresourcerobjectsfilterrdeleterdelete_feature_groupr!)rX delete_s3r{r| table_name catalog_id database_namer]rrrr rcollrcrrr rs:      rrvcCs"t|td}|j||||ddS)N)rr) data_frame max_processes max_workerswait)rringest)rXr1rrrfgrrr ingest_from_dfs rcCsni}xd|D]\}|d}|d}||}|dkr@t|||d<q |dkrZt|||d<q |||d<q W|S)Nr& ValueAsStringr+r()intfloat)rec feature_typestmp_dictf feature_namestring_feature_val feature_typerrr _record_to_dicts rcCsg}tj|t||d}dS)N)rNRecordIdentifierValueAsString EventTime)featurestore_runtime delete_recordstr)rX record_id event_timeresultsrcrrr r"s rc Cst|d}i}x|D]}|d||d<qWg}xL|D]D}t|}tj||d}yt|d|} || Wq8Yq8Xq8W|S)NrQr)r&)rNrRecord)rgrr get_recordrr0) rXZ id_value_list feature_defsrfdrcurr_idrecord_identifier_valuerc curr_recordrrr get_latest_feature_values*s    rc CsFtd}|r"td|d||j|d|id|id}|d}|j|dd d d }x4|d kr|d krtd|j|dd d d }q\W|d krt|j|d|j|dd d d}t|t} | Sd|d} d} |dddd} t | | | t d} t dt j| | dt j| | dd| SdS)NathenazRunning query : z On database: rOutputLocation) QueryStringQueryExecutionContextResultConfigurationQueryExecutionId)rQueryExecutionStatusState SUCCEEDEDFAILEDrvStateChangeReasonzoffline-store/query_results/z.csvzquery_results.csvz//rrtr)rrJz .metadata)rrrstart_query_executionget_query_executionrpd DataFramerxrrread_csvosremove delete_object) query_stringtmp_uridatabaseverboserquery_executionquery_execution_id query_statefailure_reasonr1results_file_prefixrresults_bucketrrr _run_query=s6     rcCsHt|dd}|d}|d}|dkr4dtd}|d}|||fS)NrIrrrsrzs3://z/offline-storez/query_results/)rgrT)rXr]rrr_tmp_urirrr _get_offline_detailses  r cCs8t||\}}}d|dd|d|}t|||S)NzSELECT * FROM ""z tablesample bernoulli(z) limit )rr)rXn sample_pctr]rrr _query_stringrrr sampleosrcCs:|}x(|D] }t||\}}}|||}q Wt|||S)N)rr?r)fg_namesqueryr]r_fg_namerrrrrr rts  rcCs6t||\}}}d|d}t|||dd}|jdS)NzSELECT COUNT(*) FROM "rF)r)rr)rriat)rXr]rrrr_tmp_dfrrr get_historical_record_count|s rcCsH|dkrddd|D}nddd|D}d|d|d}|S) Nr,,css|]}dt|dVqdS)'N)r).0xrrr sz%_make_where_clause..css|]}t|VqdS)N)r)rrrrr rs z IN ())rw)id_feature_nameid_feature_type record_idsZ_id_list_stringZ _where_clauserrr _make_where_clauses rcCst||\}}}t|}|d}|d} |d} i} x| D]} | d| | d<q:W| |} |dkrhd}n d|}|dkrd}nd t|| |}d |d |d |}t|||S) NrOrPrQr)r&*rrqz WHERE zSELECT z FROM "z" )rrgrwrr)rXr feature_namesr]rrrfg_resprtime_feature_namerrrrfeature_name_string where_clauserrrr %get_historical_offline_feature_valuess"  rcCst||\}}}t|}|d}|d} |d} i} x| D]} | d| | d<q:W| |} |dkrhd}n d|}|dkrd}nd t|| |}d |d d | d d|d|}d|d|d}t|||S)NrOrPrQr)r&rrrqz WHERE z*SELECT *, dense_rank() OVER (PARTITION BY rz ORDER BY z: DESC, Api_Invocation_Time DESC, write_time DESC) AS rank zFROM "z" zSELECT z FROM (z#) WHERE rank = 1 AND NOT is_deleted)rrgrwrr)rXrrr]rrrrrrrrrrrr _subqueryrrrr !get_latest_offline_feature_valuess$  *rcCs(t|ddd}t|ddd}|dkrst      $  7  (     *%