U Dx`@s UdZddlZddlZddlmZmZmZmZmZm Z m Z ddl Z ddl Z ddl ZddlZddlZddlmZddlmZddlmZmZmZddlmZeeZejed<d d d d gZ ee!ed <ddgZ"ee!ed<ej#ddddZ$ej%ee!e!ddddZ&ej%e!e!ee!dddZ'ej%ee!e!e(dddZ)dUee!ee!ee!ee!ee j*e!dddZ+dVej%e!e!e(ee!ee!ee!ee!ee!ee!dd dd Z,dWej%e!e!e!eee!dd!d"d#Z-ee!e!fe!ee!e!eee!dd$d%d&Z.ee e!ee!fe/eee!e/fe0ee!ee!e(ee j*eee!e!fee!e!fd' d(d)Z1dXeej2ee e!ee!fej%e!e!e!e(eee!e!fe!e!ee!eee!eee!e/eee!e/fe0ee!ee!e(ee j*eee!e!fe e!ee!fd,d-d.Z3e!e(e(eee!e e(e/fee j*eee!e!feej2d/d0d1Z4dYee!ee!ee!ee!ee j*e(ee/e/e(ej#d3 d4d5Z5dZe!e!ee!e/e(eee!ee j*e(ee/e/e(ej#d7 d8d9Z6d[e!ej#ee e!ee!fee eee ed:feeeffee/eee!ej7fe(e ej2eej2fd;d d?d@Z9ed]ej2ej#e!e!e!e(eee!e!fe!ee!e!eee!eee!e/eee!e/fe(e/ddEdFdGZ:d^e!e!ej#ee!ee!ee!ee!ee!ee0ee!e(e(eee!ee j*ddHdIdJZ;d_e!e!ej#ee!ee!ee!ee!ee!ee0ee!eee!e e(e/fe(e(ee j*eee!e!fe ej2eej2fdKdLdMZdS)bzAmazon Redshift Module.N)AnyDictIteratorListOptionalTupleUnion) _data_types) _databases)_utils exceptionss3) apply_configs_loggerAUTOZEVENALLKEY_RS_DISTSTYLESCOMPOUNDZ INTERLEAVED_RS_SORTSTYLES)conreturncCst|tjstddS)NzInvalid 'conn' argument, please pass a redshift_connector.Connection object. Use redshift_connector.connect() to use credentials directly or wr.redshift.connect() to fetch it from the Glue Catalog.) isinstanceredshift_connector Connectionr ZInvalidConnectionrr;/tmp/pip-target-zr53vnty/lib/python/awswrangler/redshift.py_validate_connections r)cursorschematablercCs@|rd|dnd}d|d|d}td|||dS)N"".zDROP TABLE IF EXISTS zDrop table query: %s)rdebugexecute)rr r! schema_strsqlrrr _drop_table!s r)cCsT|d|d|d|dd}|dddd}d d |D}|S) Nz4SELECT indexdef FROM pg_indexes WHERE schemaname = 'z' AND tablename = ''r(),cSsg|]}|dqS)r")strip).0fieldrrr ,sz%_get_primary_keys..)r&fetchallsplitr/)rr r!resultZrfieldsfieldsrrr_get_primary_keys(s r7cCs<|rd|dnd}|d|d|dt|dkS)NzTABLE_SCHEMA = 'z' ANDr$zHSELECT true WHERE EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE z TABLE_NAME = 'z');r)r&lenr3)rr r!r'rrr_does_table_exist0s r9)aws_access_key_idaws_secret_access_keyaws_session_tokeniam_role boto3_sessionrcCs|dk r<|dk rzOne of IAM Role or AWS ACCESS_KEY_ID and SECRET_ACCESS_KEY must be given. Unable to find ACCESS_KEY_ID and SECRET_ACCESS_KEY in boto3 session.) rr%r Zget_credentials_from_sessionZ access_keyZ secret_keyr InvalidArgumenttoken)r:r;r<r=r>auth_str credentialsrrr_make_s3_auth_string;s    rD) rpathr!serialize_to_jsonr=r:r;r<r>r rc Cs|| dkrd|d} nd| d|d} t|||||d} |rBdnd} d| d|d| d | } td | || dS) Nr""."r=r:r;r<r>z SERIALIZETOJSONr$zCOPY z FROM 'z' z FORMAT AS PARQUETzcopy query: %s)rDrr%r&)rrEr!rFr=r:r;r<r>r Z table_namerBZ ser_json_strr(rrr_copyZs   rI)rr! temp_tabler primary_keysrcs|st|||d}td||s*t|d|ddfdd|D}d|d |d |d |}t|||d |d |d|}t|||t|||ddS)Nrr r!zprimary_keys: %sz.%s = z.%sz AND csg|]}||fqSrr)r0pkZ equals_clauserrr2sz_upsert..z DELETE FROM "rGz" USING z WHERE INSERT INTO .z SELECT * FROM )r7rr%r ZInvalidRedshiftPrimaryKeysjoinr&r))rr!rJr rKZ join_clauser(rrNr_upsertxs     rR)redshift_types diststyledistkey sortstylesortkeyrcCs|tkrtdtt|}td||dkrF|sFtd|rj||krjtd|d|d|r|tkrt dt|rt |tst d |d ||D]"}||krt d |d |qdS) Nzdiststyle must be in zRedshift columns: %srz:You must pass a distkey if you intend to use KEY diststylez distkey (z) must be in the columns list: r-zsortstyle must be in z5sortkey must be a List of items in the columns list: z. Currently value: ) rr ZInvalidRedshiftDiststylelistkeysrr%ZInvalidRedshiftDistkeyrZInvalidRedshiftSortstylerZInvalidRedshiftSortkey)rSrTrUrVrWcolskeyrrr_validate_parameterss(       r\) rEvarchar_lengths_defaultvarchar_lengthsparquet_infer_sampling path_suffixpath_ignore_suffix use_threadsr>s3_additional_kwargsrc  Cs|dkr in|} tj|d} tdtj||||d|| |d\} } td| i} | D].\}}|| krt| |n|}tj||d| |<q\| S)z4Extract Redshift data types from a Pandas DataFrame.Nsessionz!Scanning parquet schemas on s3...F)rEZsamplingr`radatasetrbr>rczathena_types: %s)dtypeZvarchar_length) r ensure_sessionrr%r Zread_parquet_metadataitemsr Zathena2redshift)rEr]r^r_r`rarbr>rcZ_varchar_lengthsreZ athena_types_rSZcol_nameZcol_typelengthrrr_redshift_types_from_paths&    rl?T)dfrErr!r modeindexrgrTrVrUrWrKr]r^r_r`rarbr>rcrc Cs|dkrt|||dnjt|||ddkr|dkrztj}d|}d|d|d|d }t||||dfS||fS|r|nd }| r| nd } |dk rt j |||| |t j d }n,|dk rt || |||||||d }nt dt||| | | dddd|Ddd}| r:dd| dnd}| rZ|dkrZd| dnd}| r|d| dd| dnd}d|d|d||d||| }td|||||fS) N overwriterLTZupsertZtemp_redshift_zCREATE TEMPORARY TABLE z (LIKE "rGz")rr)rnrprgr]r^Zconverter_func) rEr]r^r_r`rarbr>rcz0df and path are None.You MUST pass at least one.)rSrTrUrVrWr$cSs g|]\}}|d|dqS) z, r)r0kvrrrr2sz!_create_table..z, PRIMARY KEY (, r-rz DISTKEY( z SORTKEY(r.zCREATE TABLE IF NOT EXISTS "z" ( z ) DISTSTYLE zCreate table query: %s)r)r9uuiduuid4hexrr%r&upperr Zdatabase_types_from_pandasZpyarrow2redshiftrl ValueErrorr\rQri)rnrErr!r rorprgrTrVrUrWrKr]r^r_r`rarbr>rcZguidrJr(rSZcols_strZprimary_keys_strZ distkey_strZ sortkey_strrrr _create_tablesf       "&  r})rE keep_filesrb categorieschunkedr>rcrc cs@tj|||d|||d}|EdH|dkrrcrErbr>rc)r read_parquetdelete_objects)rEr~rbrrr>rcdfsrrr_read_parquet_iterator s"  r) connection secret_id catalog_iddbnamer>ssltimeoutmax_prepared_statements tcp_keepaliverc Cs\tj|||||d} | jdkr2td| jdtj| j| j| j t | j | j ||||d S)aL Return a redshift_connector connection from a Glue Catalog or Secret Manager. Note ---- You MUST pass a `connection` OR `secret_id` https://github.com/aws/amazon-redshift-python-driver Parameters ---------- connection : Optional[str] Glue Catalog Connection name. secret_id: Optional[str]: Specifies the secret containing the version that you want to retrieve. You can specify either the Amazon Resource Name (ARN) or the friendly name of the secret. catalog_id : str, optional The ID of the Data Catalog. If none is provided, the AWS account ID is used by default. dbname: Optional[str] Optional database name to overwrite the stored one. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. ssl: bool This governs SSL encryption for TCP/IP sockets. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver timeout: Optional[int] This is the time in seconds before the connection to the server will time out. The default is None which means no timeout. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver max_prepared_statements: int This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver tcp_keepalive: bool If True then use TCP keepalive. The default is True. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver Returns ------- redshift_connector.Connection redshift_connector connection. Examples -------- Fetching Redshift connection from Glue Catalog >>> import awswrangler as wr >>> con = wr.redshift.connect("MY_GLUE_CONNECTION") >>> with con.cursor() as cursor: >>> cursor.execute("SELECT 1") >>> print(cursor.fetchall()) >>> con.close() Fetching Redshift connection from Secrets Manager >>> import awswrangler as wr >>> con = wr.redshift.connect(secret_id="MY_SECRET") >>> with con.cursor() as cursor: >>> cursor.execute("SELECT 1") >>> print(cursor.fetchall()) >>> con.close() )rrrrr>redshiftzInvalid connection type (z$. It must be a redshift connection.)) userdatabasepasswordporthostrrrr) _db_utilsZget_connection_attributeskindr ZInvalidDatabaseTyperconnectrrrintrr) rrrrr>rrrrattrsrrrr9s,M  r) cluster_identifierrrduration auto_create db_groupsr>rrrrrc  Cstjd|d} ||||d} |dk r.|| d<ng}| jf| } | j|ddd}tj| d |rd|n|d | d |d d |d d||| | |d S)a Return a redshift_connector temporary connection (No password required). https://github.com/aws/amazon-redshift-python-driver Parameters ---------- cluster_identifier : str The unique identifier of a cluster. This parameter is case sensitive. user : str, optional The name of a database user. database : str, optional Database name. If None, the default Database is used. duration : int, optional The number of seconds until the returned temporary password expires. Constraint: minimum 900, maximum 3600. Default: 900 auto_create : bool Create a database user with the name specified for the user named in user if one does not exist. db_groups : List[str], optional A list of the names of existing database groups that the user named in user will join for the current session, in addition to any group memberships for an existing user. If not specified, a new user is added only to PUBLIC. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. ssl: bool This governs SSL encryption for TCP/IP sockets. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver timeout: Optional[int] This is the time in seconds before the connection to the server will time out. The default is None which means no timeout. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver max_prepared_statements: int This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver tcp_keepalive: bool If True then use TCP keepalive. The default is True. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver Returns ------- redshift_connector.Connection redshift_connector connection. Examples -------- >>> import awswrangler as wr >>> con = wr.redshift.connect("MY_GLUE_CONNECTION") >>> with con.cursor() as cursor: >>> cursor.execute("SELECT 1") >>> print(cursor.fetchall()) >>> con.close() r) service_namere)DbUserClusterIdentifierZDurationSecondsZ AutoCreateNZDbGroups)rZClustersrrZDBNameZ DbPasswordZEndpointZPortZAddress) rrrrrrrrrr)r clientZget_cluster_credentialsZdescribe_clustersrr)rrrrrrr>rrrrZclient_redshiftargsresZclusterrrr connect_temps.E    r.)r(r index_colparams chunksizergsaferc Cs"t|dtj|||||||dS)uReturn a DataFrame corresponding to the result set of the query string. Note ---- For large extractions (1K+ rows) consider the function **wr.redshift.unload()**. Parameters ---------- sql : str SQL query. con : redshift_connector.Connection Use redshift_connector.connect() to use " "credentials directly or wr.redshift.connect() to fetch it from the Glue Catalog. index_col : Union[str, List[str]], optional Column(s) to set as index(MultiIndex). params : Union[List, Tuple, Dict], optional List of parameters to pass to execute method. The syntax used to pass parameters is database driver dependent. Check your database driver documentation for which of the five syntax styles, described in PEP 249’s paramstyle, is supported. chunksize : int, optional If specified, return an iterator where chunksize is the number of rows to include in each chunk. dtype : Dict[str, pyarrow.DataType], optional Specifying the datatype for columns. The keys should be the column names and the values should be the PyArrow types. safe : bool Check for overflows or other unsafe data type conversions. Returns ------- Union[pandas.DataFrame, Iterator[pandas.DataFrame]] Result as Pandas DataFrame(s). Examples -------- Reading from Redshift using a Glue Catalog Connections >>> import awswrangler as wr >>> con = wr.redshift.connect("MY_GLUE_CONNECTION") >>> df = wr.redshift.read_sql_query( ... sql="SELECT * FROM public.my_table", ... con=con ... ) >>> con.close() rr(rrrrrgr)rrread_sql_queryrrrrrs7 r) r!rr rrrrgrrc Cs<|dkrd|dnd|d|d}t|||||||dS)uReturn a DataFrame corresponding the table. Note ---- For large extractions (1K+ rows) consider the function **wr.redshift.unload()**. Parameters ---------- table : str Table name. con : redshift_connector.Connection Use redshift_connector.connect() to use " "credentials directly or wr.redshift.connect() to fetch it from the Glue Catalog. schema : str, optional Name of SQL schema in database to query (if database flavor supports this). Uses default schema if None (default). index_col : Union[str, List[str]], optional Column(s) to set as index(MultiIndex). params : Union[List, Tuple, Dict], optional List of parameters to pass to execute method. The syntax used to pass parameters is database driver dependent. Check your database driver documentation for which of the five syntax styles, described in PEP 249’s paramstyle, is supported. chunksize : int, optional If specified, return an iterator where chunksize is the number of rows to include in each chunk. dtype : Dict[str, pyarrow.DataType], optional Specifying the datatype for columns. The keys should be the column names and the values should be the PyArrow types. safe : bool Check for overflows or other unsafe data type conversions. Returns ------- Union[pandas.DataFrame, Iterator[pandas.DataFrame]] Result as Pandas DataFrame(s). Examples -------- Reading from Redshift using a Glue Catalog Connections >>> import awswrangler as wr >>> con = wr.redshift.connect("MY_GLUE_CONNECTION") >>> df = wr.redshift.read_sql_table( ... table="my_table", ... schema="public", ... con=con ... ) >>> con.close() NzSELECT * FROM "r"rGr)r) r!rr rrrrgrr(rrrread_sql_table7s<&rappendF)rnrr!r rorprgrTrUrVrWrKr]r^use_column_namesrrcCs|jdkrtt|d|j}d|_zXz|}t|d|||||||| || | | | d\}}|r||j|jj ddd dgt |j }|rd |d nd }d }|rd d |j d }t j|||d}|D]>\}}d|d |d|d|}td||||fq||kr2t||||| d|W5QRXWn8tk r~}z|t|W5d}~XYnXW5||_XdS)aC Write records stored in a DataFrame into Redshift. Note ---- For large DataFrames (1K+ rows) consider the function **wr.redshift.copy()**. Parameters ---------- df : pandas.DataFrame Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html con : redshift_connector.Connection Use redshift_connector.connect() to use " "credentials directly or wr.redshift.connect() to fetch it from the Glue Catalog. table : str Table name schema : str Schema name mode : str Append, overwrite or upsert. index : bool True to store the DataFrame index as a column in the table, otherwise False to ignore it. dtype: Dict[str, str], optional Dictionary of columns names and Redshift types to be casted. Useful when you have columns with undetermined or mixed data types. (e.g. {'col name': 'VARCHAR(10)', 'col2 name': 'FLOAT'}) diststyle : str Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"]. https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html distkey : str, optional Specifies a column name or positional number for the distribution key. sortstyle : str Sorting can be "COMPOUND" or "INTERLEAVED". https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html sortkey : List[str], optional List of columns to be sorted. primary_keys : List[str], optional Primary keys. varchar_lengths_default : int The size that will be set for all VARCHAR columns not specified with varchar_lengths. varchar_lengths : Dict[str, int], optional Dict of VARCHAR length by columns. (e.g. {"col1": 10, "col5": 200}). use_column_names: bool If set to True, will use the column names of the DataFrame for generating the INSERT SQL Query. E.g. If the DataFrame has two columns `col1` and `col3` and `use_column_names` is True, data will only be inserted into the database columns `col1` and `col3`. chunksize: int Number of rows which are inserted with each SQL query. Defaults to inserting 200 rows per query. Returns ------- None None. Examples -------- Writing to Redshift using a Glue Catalog Connections >>> import awswrangler as wr >>> con = wr.redshift.connect("MY_GLUE_CONNECTION") >>> wr.redshift.to_sql( ... df=df, ... table="my_table", ... schema="public", ... con=con ... ) >>> con.close() TrFN)rnrErr!r rorprgrTrVrUrWrKr]r^)levelZinplacervz%sr"r#r$r+r-)rncolumn_placeholdersrrOz" z VALUES zsql: %srr r!rJrK)emptyr ZEmptyDataFramer autocommitrr}Z reset_indexrpnamesrQr8columnsrZ$generate_placeholder_parameter_pairsrr%Z executemanyrRcommit Exceptionrollbackerror)rnrr!r rorprgrTrUrVrWrKr]r^rrautocommit_tempr created_tablecreated_schemarr'Zinsertion_columnsZ$placeholder_parameter_pair_generatorZ placeholders parametersr(exrrrto_sqlys`Y        r)r(rErr=r:r;r<region max_file_size kms_key_idmanifestrbpartition_colsr>rcCs|dr|n|d}tj| d}tj|| |d|}| rTdd| dnd}| dkrdd nd}|d k r|d |d nd}|d k rd |dnd}| d k rd| d nd}t||||| d}d|d|d|d|||||d }t d|| |W5QRXd S)a Unload Parquet files on s3 from a Redshift query result (Through the UNLOAD command). https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html Note ---- In case of `use_threads=True` the number of threads that will be spawned will be gotten from os.cpu_count(). Parameters ---------- sql: str SQL query. path : Union[str, List[str]] S3 path to write stage files (e.g. s3://bucket_name/any_name/) con : redshift_connector.Connection Use redshift_connector.connect() to use " "credentials directly or wr.redshift.connect() to fetch it from the Glue Catalog. iam_role : str, optional AWS IAM role with the related permissions. aws_access_key_id : str, optional The access key for your AWS account. aws_secret_access_key : str, optional The secret key for your AWS account. aws_session_token : str, optional The session key for your AWS account. This is only needed when you are using temporary credentials. region : str, optional Specifies the AWS Region where the target Amazon S3 bucket is located. REGION is required for UNLOAD to an Amazon S3 bucket that isn't in the same AWS Region as the Amazon Redshift cluster. By default, UNLOAD assumes that the target Amazon S3 bucket is located in the same AWS Region as the Amazon Redshift cluster. max_file_size : float, optional Specifies the maximum size (MB) of files that UNLOAD creates in Amazon S3. Specify a decimal value between 5.0 MB and 6200.0 MB. If None, the default maximum file size is 6200.0 MB. kms_key_id : str, optional Specifies the key ID for an AWS Key Management Service (AWS KMS) key to be used to encrypt data files on Amazon S3. use_threads : bool True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. manifest : bool Unload a manifest file on S3. partition_cols: List[str], optional Specifies the partition keys for the unload operation. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. Returns ------- None Examples -------- >>> import awswrangler as wr >>> con = wr.redshift.connect("MY_GLUE_CONNECTION") >>> wr.redshift.unload_to_files( ... sql="SELECT * FROM public.mytable", ... path="s3://bucket/extracted_parquet_files/", ... con=con, ... iam_role="arn:aws:iam::XXX:role/XXX" ... ) >>> con.close() /rd)rErbr>z PARTITION BY (r.r-r$Tz manifestNz REGION AS 'r*z MAXFILESIZE AS z MBz KMS_KEY_ID 'rHz UNLOAD ('z') TO 'r?z3ALLOWOVERWRITE PARALLEL ON FORMAT PARQUET ENCRYPTED;zsql: %s) endswithr rhr rrrQrDrr%r&)r(rErr=r:r;r<rrrrrbrr>rerZ partition_strZ manifest_strZ region_strZmax_file_size_strZkms_key_id_strrBrrrunload_to_filess(S   , r)r(rErr=r:r;r<rrrrrr~rbr>rcrcCstj|d}t|||||||||| d| |d | dkrltj|| | d| ||d}| dkrhtj|| ||d|St|| | | ||| dS)aLoad Pandas DataFrame from a Amazon Redshift query result using Parquet files on s3 as stage. This is a **HIGH** latency and **HIGH** throughput alternative to `wr.redshift.read_sql_query()`/`wr.redshift.read_sql_table()` to extract large Amazon Redshift data into a Pandas DataFrames through the **UNLOAD command**. This strategy has more overhead and requires more IAM privileges than the regular `wr.redshift.read_sql_query()`/`wr.redshift.read_sql_table()` function, so it is only recommended to fetch 1k+ rows at once. https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html Note ---- ``Batching`` (`chunked` argument) (Memory Friendly): Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame. There are two batching strategies on Wrangler: - If **chunked=True**, a new DataFrame will be returned for each file in your path/dataset. - If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER. `P.S.` `chunked=True` if faster and uses less memory while `chunked=INTEGER` is more precise in number of rows for each Dataframe. Note ---- In case of `use_threads=True` the number of threads that will be spawned will be gotten from os.cpu_count(). Parameters ---------- sql: str SQL query. path : Union[str, List[str]] S3 path to write stage files (e.g. s3://bucket_name/any_name/) con : redshift_connector.Connection Use redshift_connector.connect() to use " "credentials directly or wr.redshift.connect() to fetch it from the Glue Catalog. iam_role : str, optional AWS IAM role with the related permissions. aws_access_key_id : str, optional The access key for your AWS account. aws_secret_access_key : str, optional The secret key for your AWS account. aws_session_token : str, optional The session key for your AWS account. This is only needed when you are using temporary credentials. region : str, optional Specifies the AWS Region where the target Amazon S3 bucket is located. REGION is required for UNLOAD to an Amazon S3 bucket that isn't in the same AWS Region as the Amazon Redshift cluster. By default, UNLOAD assumes that the target Amazon S3 bucket is located in the same AWS Region as the Amazon Redshift cluster. max_file_size : float, optional Specifies the maximum size (MB) of files that UNLOAD creates in Amazon S3. Specify a decimal value between 5.0 MB and 6200.0 MB. If None, the default maximum file size is 6200.0 MB. kms_key_id : str, optional Specifies the key ID for an AWS Key Management Service (AWS KMS) key to be used to encrypt data files on Amazon S3. categories: List[str], optional List of columns names that should be returned as pandas.Categorical. Recommended for memory restricted environments. keep_files : bool Should keep stage files? chunked : Union[int, bool] If passed will split the data in a Iterable of DataFrames (Memory friendly). If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER. use_threads : bool True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs: Forward to botocore requests, only "SSECustomerAlgorithm" and "SSECustomerKey" arguments will be considered. Returns ------- Union[pandas.DataFrame, Iterator[pandas.DataFrame]] Result as Pandas DataFrame(s). Examples -------- >>> import awswrangler as wr >>> con = wr.redshift.connect("MY_GLUE_CONNECTION") >>> df = wr.redshift.unload( ... sql="SELECT * FROM public.mytable", ... path="s3://bucket/extracted_parquet_files/", ... con=con, ... iam_role="arn:aws:iam::XXX:role/XXX" ... ) >>> con.close() rdF) r(rErr=r:r;r<rrrrrbr>rr)rErrrbr>rcr~)r rhrr rrr)r(rErr=r:r;r<rrrrrr~rbr>rcrernrrrunloadzsVt  r)rErr!r r=r:r;r<r_rorTrUrVrWrKr]r^rFr`rarbr>rcrcCs|j}d|_zz|z}td|||||||| | | | | |||dd|||d\}}t||||||||||d ||krt|||||d|W5QRXWn6tk r}z|t |W5d}~XYnXW5||_XdS)a|Load Parquet files from S3 to a Table on Amazon Redshift (Through COPY command). https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html Note ---- If the table does not exist yet, it will be automatically created for you using the Parquet metadata to infer the columns data types. Note ---- In case of `use_threads=True` the number of threads that will be spawned will be gotten from os.cpu_count(). Parameters ---------- path : str S3 prefix (e.g. s3://bucket/prefix/) con : redshift_connector.Connection Use redshift_connector.connect() to use " "credentials directly or wr.redshift.connect() to fetch it from the Glue Catalog. table : str Table name schema : str Schema name iam_role : str, optional AWS IAM role with the related permissions. aws_access_key_id : str, optional The access key for your AWS account. aws_secret_access_key : str, optional The secret key for your AWS account. aws_session_token : str, optional The session key for your AWS account. This is only needed when you are using temporary credentials. parquet_infer_sampling : float Random sample ratio of files that will have the metadata inspected. Must be `0.0 < sampling <= 1.0`. The higher, the more accurate. The lower, the faster. mode : str Append, overwrite or upsert. diststyle : str Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"]. https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html distkey : str, optional Specifies a column name or positional number for the distribution key. sortstyle : str Sorting can be "COMPOUND" or "INTERLEAVED". https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html sortkey : List[str], optional List of columns to be sorted. primary_keys : List[str], optional Primary keys. varchar_lengths_default : int The size that will be set for all VARCHAR columns not specified with varchar_lengths. varchar_lengths : Dict[str, int], optional Dict of VARCHAR length by columns. (e.g. {"col1": 10, "col5": 200}). serialize_to_json : bool Should Wrangler add SERIALIZETOJSON parameter into the COPY command? SERIALIZETOJSON is necessary to load nested data https://docs.aws.amazon.com/redshift/latest/dg/ingest-super.html#copy_json path_suffix: Union[str, List[str], None] Suffix or List of suffixes to be scanned on s3 for the schema extraction (e.g. [".gz.parquet", ".snappy.parquet"]). Only has effect during the table creation. If None, will try to read all files. (default) path_ignore_suffix: Union[str, List[str], None] Suffix or List of suffixes for S3 keys to be ignored during the schema extraction. (e.g. [".csv", "_SUCCESS"]). Only has effect during the table creation. If None, will try to read all files. (default) use_threads : bool True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs: Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", "RequestPayer", "ExpectedBucketOwner". e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} Returns ------- None None. Examples -------- >>> import awswrangler as wr >>> con = wr.redshift.connect("MY_GLUE_CONNECTION") >>> wr.db.copy_from_files( ... path="s3://bucket/my_parquet_files/", ... con=con, ... table="my_table", ... schema="public" ... iam_role="arn:aws:iam::XXX:role/XXX" ... ) >>> con.close() FN)rnrEr_r`rarr!r rorTrVrUrWrKr]r^rprgrbr>rc) rrEr!r r=r:r;r<r>rFr) rrr}rIrRrrrrr)rErr!r r=r:r;r<r_rorTrUrVrWrKr]r^rFr`rarbr>rcrrrrrrrrcopy_from_filess^    r逖)rnrErr!r r=r:r;r<rprgrorTrUrVrWrKr]r^rFr~rbr>rcmax_rows_by_filercCs|dr|ddn|}|dr(|n|d}tj|d}tj|||dr`td|dzRtj||| d d | ||||d t ||||||||| | | |||||||||dW5|d krtj||||d XdS)aLoad Pandas DataFrame as a Table on Amazon Redshift using parquet files on S3 as stage. This is a **HIGH** latency and **HIGH** throughput alternative to `wr.redshift.to_sql()` to load large DataFrames into Amazon Redshift through the ** SQL COPY command**. This strategy has more overhead and requires more IAM privileges than the regular `wr.redshift.to_sql()` function, so it is only recommended to inserting +1K rows at once. https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html Note ---- If the table does not exist yet, it will be automatically created for you using the Parquet metadata to infer the columns data types. Note ---- In case of `use_threads=True` the number of threads that will be spawned will be gotten from os.cpu_count(). Parameters ---------- df: pandas.DataFrame Pandas DataFrame. path : str S3 path to write stage files (e.g. s3://bucket_name/any_name/). Note: This path must be empty. con : redshift_connector.Connection Use redshift_connector.connect() to use " "credentials directly or wr.redshift.connect() to fetch it from the Glue Catalog. table : str Table name schema : str Schema name iam_role : str, optional AWS IAM role with the related permissions. aws_access_key_id : str, optional The access key for your AWS account. aws_secret_access_key : str, optional The secret key for your AWS account. aws_session_token : str, optional The session key for your AWS account. This is only needed when you are using temporary credentials. index : bool True to store the DataFrame index in file, otherwise False to ignore it. dtype: Dict[str, str], optional Dictionary of columns names and Athena/Glue types to be casted. Useful when you have columns with undetermined or mixed data types. Only takes effect if dataset=True. (e.g. {'col name': 'bigint', 'col2 name': 'int'}) mode : str Append, overwrite or upsert. diststyle : str Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"]. https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html distkey : str, optional Specifies a column name or positional number for the distribution key. sortstyle : str Sorting can be "COMPOUND" or "INTERLEAVED". https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html sortkey : List[str], optional List of columns to be sorted. primary_keys : List[str], optional Primary keys. varchar_lengths_default : int The size that will be set for all VARCHAR columns not specified with varchar_lengths. varchar_lengths : Dict[str, int], optional Dict of VARCHAR length by columns. (e.g. {"col1": 10, "col5": 200}). keep_files : bool Should keep stage files? use_threads : bool True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. s3_additional_kwargs: Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass", "SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging", "RequestPayer", "ExpectedBucketOwner". e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'} max_rows_by_file : int Max number of rows in each file. Default is None i.e. dont split the files. (e.g. 33554432, 268435456) Returns ------- None None. Examples -------- >>> import awswrangler as wr >>> import pandas as pd >>> con = wr.redshift.connect("MY_GLUE_CONNECTION") >>> wr.db.copy( ... df=pd.DataFrame({'col': [1, 2, 3]}), ... path="s3://bucket/my_parquet_files/", ... con=con, ... table="my_table", ... schema="public" ... iam_role="arn:aws:iam::XXX:role/XXX" ... ) >>> con.close() *Nrrd)rEr>rczThe received S3 path (zk) is not empty. Please, provide a different path or use wr.s3.delete_objects() to clean up the current one.FrTr) rnrErprfrorgrbr>rcr)rErr!r r=r:r;r<rorTrUrVrWrKr]r^rFrbr>rc) rr rhr Z list_objectsr r@rZ to_parquetr)rnrErr!r r=r:r;r<rprgrorTrUrVrWrKr]r^rFr~rbr>rcrrerrrcopysd    r)NNNNN)NNNNNN)N)rmNNTNN) NNNNNTNrT) NrTNNTNrT)NNNNT)NNNNNT) rFNrNrNNrNFr) NNNNNNNFTNN) NNNNNNNNFFTNN)NNNNrmrrNrNNrNFNNTNN)NNNNFNrrNrNNrNFFTNNr)?__doc__loggingrxtypingrrrrrrrZboto3ZbotocoreZpandaspdZpyarrowparZ awswranglerr r rr r r Zawswrangler._configr getLogger__name__rLogger__annotations__rstrrrrCursorr)r7boolr9SessionrDrIrRr\rfloatrlZ DataFramer}rrrZDataTyperrrrrrrrrrrs$      $ #      1   T    d  c$ @$ B    {   $   8