U Dx`08 @sUdZddlZddlZddlZddlmZddlmZmZm Z m Z m Z ddl Z ddl ZddlmZddlmZmZeeZejed<eje e edd d Zeee eee eeje eeefd d d ZeeedddZe eeefeeefe edddZeeefe eeefdddZ d*ejeeeee ee!e e j"e eeefd ddZ#d+ee e j"ejdddZ$d,ee ee eeefe e j"eddd Z%d-ee e j"dd!d"d#Z&d.eee!e!e eeefe e j"ed$d%d&Z'd/eee e j"dd'd(d)Z(dS)0zAmazon Timestream Module.N)datetime)AnyDictListOptionalcast)Config) _data_types_utils_logger)dfreturncCsd|j}t|D]L\}}t|D]:\}}t|rBd|||<q"t|dr"||||<q"q|S)zExtract Parameters.N to_pydatetime)valuestolist enumeratepdZisnahasattrr)r parametersirowjvaluer=/tmp/pip-target-zr53vnty/lib/python/awswrangler/timestream.py_df2lists   r)databasetable cols_names measure_typebatchboto3_primitivesr c stj|d}tjd|tddddidd}z8tj|j|jj|jjfd ||fd d |Dd WnF|jj k r}z$t t t t t f|jd WYSd}~XYnXgS)N) primitivestimestream-writei max_attempts ) read_timeoutZmax_pool_connectionsretries service_namesessionZbotocore_configc s^g|]V}ddtdd|ddDdt|dtt|ddddqS) cSs g|]\}}|dt|dqS)VARCHAR)NameZDimensionValueTypeValue)str).0namerrrr 5sz+_write_batch...NriZ MILLISECONDS)Z DimensionsZ MeasureNameZMeasureValueTypeZ MeasureValueZTimeZTimeUnit)zipr0round timestamp)r1Zrecrrrrr33s  z _write_batch..)fexZ max_num_tries DatabaseName TableNameZRecordsZRejectedRecords)r Zboto3_from_primitivesclientrZtry_itZ write_records exceptionsZThrottlingExceptionZInternalServerExceptionZRejectedRecordsExceptionrrrr0response) rrrrr r! boto3_sessionr>r;rr9r _write_batchs(   2rB)rdtyper cCs|dkr |S|dkrt|S|dkr,t|S|dkr@|dkS|dkr\t|dddS|d krtt|d S|d krt|ddd Std |dS)Nr-)ZINTEGERZBIGINTZDOUBLEZBOOLEANtrue TIMESTAMPz%Y-%m-%d %H:%M:%S.%fZDATEz%Y-%m-%dZTIMEz %H:%M:%S.%fz&Not supported Amazon Timestream type: )intfloatlowerrstrptimedatetime ValueErrorrrCrrr _cast_valueGs rO)schemarr cCs~g}t||dD]f\}}|ddr2|dqd|krV|t|d|ddqtd|dd |dd |q|S) NDataZ NullValueFZ ScalarValuetyperNz/Query with non ScalarType/NullValue for column r2z . Expected z instead of )r6getappendrOrM)rPrZ row_processedZ col_schemacolrrr _process_rowYs  rV)pager cCsZg}|dD]H}d|dkr8td|dd|d||d|dddq |S)NZ ColumnInfoZ ScalarTypeTypez%Query with non ScalarType for column r.z: )r2rR)rMrT)rWrPrUrrr_process_schemahs   rY ) r rrtime_col measure_coldimensions_cols num_threadsrAr c Cst||g}td|||g|} td| tjt|| ddd} tdt| tj j |d\} t | t t|t|t| t|| ttj|d} d d | DW5QRSQRXd S) aNStore a Pandas DataFrame into a Amazon Timestream table. Parameters ---------- df: pandas.DataFrame Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html database : str Amazon Timestream database name. table : str Amazon Timestream table name. time_col : str DataFrame column name to be used as time. MUST be a timestamp column. measure_col : str DataFrame column name to be used as measure. dimensions_cols : List[str] List of DataFrame column names to be used as dimensions. num_threads : str Number of thread to be used for concurrent writing. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 Session will be used if boto3_session receive None. Returns ------- List[Dict[str, str]] Rejected records. Examples -------- Store a Pandas DataFrame into a Amazon Timestream table. >>> import awswrangler as wr >>> import pandas as pd >>> df = pd.DataFrame( >>> { >>> "time": [datetime.now(), datetime.now(), datetime.now()], >>> "dim0": ["foo", "boo", "bar"], >>> "dim1": [1, 2, 3], >>> "measure": [1.0, 1.1, 1.2], >>> } >>> ) >>> rejected_records = wr.timestream.write( >>> df=df, >>> database="sampleDB", >>> table="sampleTable", >>> time_col="time", >>> measure_col="measure", >>> dimensions_cols=["dim0", "dim1"], >>> ) >>> assert len(rejected_records) == 0 zmeasure_type: %szcols_names: %s)r d)lst max_lengthzlen(batches): %s) max_workers)rAcSsg|]}|D]}|q qSrr)r1Zsublistitemrrrr3szwrite..N)r Ztimestream_type_from_pandasr debugr Zchunkifyrlen concurrentfuturesZThreadPoolExecutorlistmaprB itertoolsrepeatZboto3_to_primitives) r rrr[r\r]r^rArrZbatchesexecutorresrrrwriteqs&=    rn)sqlrAr c Cstjd|tdddidd}|d}g}g}|j|dD]2}|sLt|d }|d D]}|t||d qTq:t d |t j |d d|Dd}|D]*} | ddkr|| d d|| d<q|S)aRun a query and retrieve the result as a Pandas DataFrame. Parameters ---------- sql: str SQL query. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 Session will be used if boto3_session receive None. Returns ------- pd.DataFrame Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html Examples -------- Running a query and storing the result as a Pandas DataFrame >>> import awswrangler as wr >>> df = wr.timestream.query('SELECT * FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 10') ztimestream-query<r%r&)r'r(r)query)Z QueryString)rWZRows)rPrz schema: %scSsg|] }|dqS)r2r)r1crrrr3szquery..)datacolumnsrRr-r2string) r r>rZ get_paginatorZpaginaterYrTrVr rdr DataFrameZastype) rorAr>Z paginatorrowsrPrWrr rUrrrrqs&     rq)r kms_key_idtagsrAr cCsbtjd|d}d|i}|dk r&||d<|dk rDdd|D|d<|jf|}tt|d d S) aKCreate a new Timestream database. Note ---- If the KMS key is not specified, the database will be encrypted with a Timestream managed KMS key located in your account. Parameters ---------- database: str Database name. kms_key_id: Optional[str] The KMS key for the database. If the KMS key is not specified, the database will be encrypted with a Timestream managed KMS key located in your account. tags: Optional[Dict[str, str]] Key/Value dict to put on the database. Tags enable you to categorize databases and/or tables, for example, by purpose, owner, or environment. e.g. {"foo": "boo", "bar": "xoo"}) boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 Session will be used if boto3_session receive None. Returns ------- str The Amazon Resource Name that uniquely identifies this database. (ARN) Examples -------- Creating a database. >>> import awswrangler as wr >>> arn = wr.timestream.create_database("MyDatabase") r#r*r+r<NZKmsKeyIdcSsg|]\}}||dqS)ZKeyr/rr1kvrrrr3sz#create_database..TagsZDatabaseArn)r r>itemscreate_databaserr0)rrxryrAr>argsr@rrrrs) r)rrAr cCstjd|d}|j|ddS)aHDelete a given Timestream database. This is an irreversible operation. After a database is deleted, the time series data from its tables cannot be recovered. All tables in the database must be deleted first, or a ValidationException error will be thrown. Due to the nature of distributed retries, the operation can return either success or a ResourceNotFoundException. Clients should consider them equivalent. Parameters ---------- database: str Database name. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 Session will be used if boto3_session receive None. Returns ------- None None. Examples -------- Deleting a database >>> import awswrangler as wr >>> arn = wr.timestream.delete_database("MyDatabase") r#rz)r<N)r r>delete_database)rrAr>rrrr"s"r)rrmemory_retention_hoursmagnetic_retention_daysryrAr c Cs\tjd|d}||||dd}|dk r>dd|D|d<|jf|}tt|d d S) aCreate a new Timestream database. Note ---- If the KMS key is not specified, the database will be encrypted with a Timestream managed KMS key located in your account. Parameters ---------- database: str Database name. table: str Table name. memory_retention_hours: int The duration for which data must be stored in the memory store. magnetic_retention_days: int The duration for which data must be stored in the magnetic store. tags: Optional[Dict[str, str]] Key/Value dict to put on the table. Tags enable you to categorize databases and/or tables, for example, by purpose, owner, or environment. e.g. {"foo": "boo", "bar": "xoo"}) boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 Session will be used if boto3_session receive None. Returns ------- str The Amazon Resource Name that uniquely identifies this database. (ARN) Examples -------- Creating a table. >>> import awswrangler as wr >>> arn = wr.timestream.create_table( ... database="MyDatabase", ... table="MyTable", ... memory_retention_hours=3, ... magnetic_retention_days=7 ... ) r#rz)Z!MemoryStoreRetentionPeriodInHoursZ"MagneticStoreRetentionPeriodInDays)r<r=ZRetentionPropertiesNcSsg|]\}}||dqSr{rr|rrrr3sz create_table..rZTabler)r r>r create_tablerr0) rrrrryrAr>rr@rrrrHs3 r)rrrAr cCs tjd|d}|j||ddS)a!Delete a given Timestream table. This is an irreversible operation. After a Timestream database table is deleted, the time series data stored in the table cannot be recovered. Due to the nature of distributed retries, the operation can return either success or a ResourceNotFoundException. Clients should consider them equivalent. Parameters ---------- database: str Database name. table: str Table name. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 Session will be used if boto3_session receive None. Returns ------- None None. Examples -------- Deleting a table >>> import awswrangler as wr >>> arn = wr.timestream.delete_table("MyDatabase", "MyTable") r#rz)r<r=N)r r> delete_table)rrrAr>rrrrs%r)rZN)N)NNN)N)NN)N))__doc__concurrent.futuresrfrjloggingrtypingrrrrrZboto3ZpandasrZbotocore.configrZ awswranglerr r getLogger__name__r Logger__annotations__rvrr0ZBoto3PrimitivesTyperBrOrVrYrGSessionrnrqrrrrrrrrs    )*$ R. 5 + E