import re import typing from collections import deque from itertools import count, islice from typing import TYPE_CHECKING from warnings import warn import redshift_connector from redshift_connector.config import table_type_clauses from redshift_connector.error import ( MISSING_MODULE_ERROR_MSG, InterfaceError, ProgrammingError, ) if TYPE_CHECKING: from redshift_connector.core import Connection try: import numpy # type: ignore import pandas # type: ignore except: pass class Cursor: """A cursor object is returned by the :meth:`~Connection.cursor` method of a connection. It has the following attributes and methods: .. attribute:: arraysize This read/write attribute specifies the number of rows to fetch at a time with :meth:`fetchmany`. It defaults to 1. .. attribute:: connection This read-only attribute contains a reference to the connection object (an instance of :class:`Connection`) on which the cursor was created. This attribute is part of a DBAPI 2.0 extension. Accessing this attribute will generate the following warning: ``DB-API extension cursor.connection used``. .. attribute:: rowcount This read-only attribute contains the number of rows that the last ``execute()`` or ``executemany()`` method produced (for query statements like ``SELECT``) or affected (for modification statements like ``UPDATE``). The value is -1 if: - No ``execute()`` or ``executemany()`` method has been performed yet on the cursor. - There was no rowcount associated with the last ``execute()``. - At least one of the statements executed as part of an ``executemany()`` had no row count associated with it. - Using a ``SELECT`` query statement on PostgreSQL server older than version 9. - Using a ``COPY`` query statement on PostgreSQL server version 8.1 or older. This attribute is part of the `DBAPI 2.0 specification `_. .. attribute:: description This read-only attribute is a sequence of 7-item sequences. Each value contains information describing one result column. The 7 items returned for each column are (name, type_code, display_size, internal_size, precision, scale, null_ok). Only the first two values are provided by the current implementation. This attribute is part of the `DBAPI 2.0 specification `_. """ def __init__(self: "Cursor", connection: "Connection", paramstyle=None) -> None: """ A cursor object is returned by the :meth:`~Connection.cursor` method of a connection. Parameters ---------- connection : :class:`Connection` The :class:`Connection` object to associate with this :class:`Cursor` paramstyle : Optional[str] The DB-API paramstyle to use with this :class:`Cursor` """ self._c: typing.Optional["Connection"] = connection self.arraysize: int = 1 self.ps: typing.Optional[typing.Dict[str, typing.Any]] = None self._row_count: int = -1 self._cached_rows: deque = deque() if paramstyle is None: self.paramstyle: str = redshift_connector.paramstyle else: self.paramstyle = paramstyle def __enter__(self: "Cursor") -> "Cursor": return self def __exit__(self: "Cursor", exc_type, exc_value, traceback) -> None: self.close() @property def connection(self: "Cursor") -> typing.Optional["Connection"]: warn("DB-API extension cursor.connection used", stacklevel=3) return self._c @property def rowcount(self: "Cursor") -> int: return self._row_count description = property(lambda self: self._getDescription()) def _getDescription(self: "Cursor") -> typing.Optional[typing.List[typing.Optional[typing.Tuple]]]: if self.ps is None: return None row_desc: typing.List[typing.Dict[str, typing.Union[bytes, int, typing.Callable]]] = self.ps["row_desc"] if len(row_desc) == 0: return None columns: typing.List[typing.Optional[typing.Tuple]] = [] for col in row_desc: columns.append((col["label"], col["type_oid"], None, None, None, None, None)) return columns ## # Executes a database operation. Parameters may be provided as a sequence # or mapping and will be bound to variables in the operation. #

# Stability: Part of the DBAPI 2.0 specification. def execute(self: "Cursor", operation, args=None, stream=None, merge_socket_read=False) -> "Cursor": """Executes a database operation. Parameters may be provided as a sequence, or as a mapping, depending upon the value of :data:`paramstyle`. This method is part of the `DBAPI 2.0 specification `_. :param operation: str The SQL statement to execute. :param args: typing.Union[typing.Mapping, typing.Dict, list] If :data:`paramstyle` is ``qmark``, ``numeric``, or ``format``, this argument should be an array of parameters to bind into the statement. If :data:`paramstyle` is ``named``, the argument should be a dict mapping of parameters. If the :data:`paramstyle` is ``pyformat``, the argument value may be either an array or a mapping. :param stream: This is a extension for use with the PostgreSQL `COPY `_ command. For a COPY FROM the parameter must be a readable file-like object, and for COPY TO it must be writable. .. versionadded:: 1.9.11 Returns ------- The Cursor object used for executing the specified database operation: :class:`Cursor` """ if self._c is None: raise InterfaceError("Cursor closed") if self._c._sock is None: raise InterfaceError("connection is closed") try: self.stream = stream # For Redshift, we need to begin transaction and then to process query # In the end we can use commit or rollback to end the transaction if not self._c.in_transaction and not self._c.autocommit: self._c.execute(self, "begin transaction", None) self._c.merge_socket_read = merge_socket_read self._c.execute(self, operation, args) except AttributeError as e: raise e return self def executemany(self: "Cursor", operation, param_sets) -> "Cursor": """Prepare a database operation, and then execute it against all parameter sequences or mappings provided. This method is part of the `DBAPI 2.0 specification `_. :param operation: str The SQL statement to execute :param parameter_sets: A sequence of parameters to execute the statement with. The values in the sequence should be sequences or mappings of parameters, the same as the args argument of the :meth:`execute` method. Returns ------- The Cursor object used for executing the specified database operation: :class:`Cursor` """ rowcounts: typing.List[int] = [] for parameters in param_sets: self.execute(operation, parameters) rowcounts.append(self._row_count) self._row_count = -1 if -1 in rowcounts else sum(rowcounts) return self def fetchone(self: "Cursor") -> typing.Optional[typing.List]: """Fetch the next row of a query result set. This method is part of the `DBAPI 2.0 specification `_. Returns ------- A row as a sequence of field values, or ``None`` if no more rows are available:typing.Optional[typing.List] """ try: return next(self) except StopIteration: return None except TypeError: raise ProgrammingError("attempting to use unexecuted cursor") except AttributeError: raise ProgrammingError("attempting to use unexecuted cursor") def fetchmany(self: "Cursor", num: typing.Optional[int] = None) -> typing.Tuple: """Fetches the next set of rows of a query result. This method is part of the `DBAPI 2.0 specification `_. :param num: The number of rows to fetch when called. If not provided, the :attr:`arraysize` attribute value is used instead. :returns: A sequence, each entry of which is a sequence of field values making up a row. If no more rows are available, an empty sequence will be returned.:typing.Tuple """ try: return tuple(islice(self, self.arraysize if num is None else num)) except TypeError: raise ProgrammingError("attempting to use unexecuted cursor") def fetchall(self: "Cursor") -> typing.Tuple: """Fetches all remaining rows of a query result. This method is part of the `DBAPI 2.0 specification `_. :returns: A sequence, each entry of which is a sequence of field values making up a row.:tuple """ try: return tuple(self) except TypeError: raise ProgrammingError("attempting to use unexecuted cursor") def close(self: "Cursor") -> None: """Closes the cursor. This method is part of the `DBAPI 2.0 specification `_. A row as a sequence of field values, or ``None`` if no more rows are available. Returns ------- None:None """ self._c = None def __iter__(self: "Cursor") -> "Cursor": """A cursor object is iterable to retrieve the rows from a query. This is a DBAPI 2.0 extension. """ return self def setinputsizes(self: "Cursor", sizes): """This method is part of the `DBAPI 2.0 specification `_, however, it is not implemented. """ pass def setoutputsize(self: "Cursor", size, column=None): """This method is part of the `DBAPI 2.0 specification `_, however, it is not implemented. """ pass def __next__(self: "Cursor") -> typing.List: try: return self._cached_rows.popleft() except IndexError: if self.ps is None: raise ProgrammingError("A query hasn't been issued.") elif len(self.ps["row_desc"]) == 0: raise ProgrammingError("no result set") else: raise StopIteration() def fetch_dataframe(self: "Cursor", num: typing.Optional[int] = None) -> typing.Optional["pandas.DataFrame"]: """ Fetches a user defined number of rows of a query result as a :class:`pandas.DataFrame`. Parameters ---------- num : Optional[int] The number of rows to retrieve. If unspecified, all rows will be retrieved Returns ------- A `pandas.DataFrame` containing field values making up a row. A column label, derived from the row description of the table, is provided.:typing.Optional["pandas.Dataframe"] """ try: import pandas except ModuleNotFoundError: raise ModuleNotFoundError(MISSING_MODULE_ERROR_MSG.format(module="pandas")) columns: typing.Optional[typing.List[typing.Union[str, bytes]]] = None try: columns = [column[0].decode().lower() for column in self.description] except UnicodeError as e: warn( "Unable to decode column names. Byte values will be used for pandas dataframe column labels.", stacklevel=2, ) columns = [column[0].lower() for column in self.description] except: warn("No row description was found. pandas dataframe will be missing column labels.", stacklevel=2) if num: fetcheddata: tuple = self.fetchmany(num) else: fetcheddata = self.fetchall() result: typing.List = [tuple(column for column in rows) for rows in fetcheddata] if len(result) == 0: return None return pandas.DataFrame(result, columns=columns) def __is_valid_table(self: "Cursor", table: str) -> bool: split_table_name: typing.List[str] = table.split(".") if len(split_table_name) > 2: return False q: str = "select 1 from information_schema.tables where table_name = ?" temp = self.paramstyle self.paramstyle = "qmark" try: if len(split_table_name) == 2: q += " and table_schema = ?" self.execute(q, (split_table_name[1], split_table_name[0])) else: self.execute(q, (split_table_name[0],)) except: raise finally: # reset paramstyle to it's original value self.paramstyle = temp result = self.fetchone() return result[0] == 1 if result is not None else False def write_dataframe(self: "Cursor", df: "pandas.DataFrame", table: str) -> None: """ Inserts a :class:`pandas.DataFrame` into an table within the current database. Parameters ---------- df : :class:`pandas.DataFrame` Contains row values to insert into `table` table : str Name of an existing table in the current Amazon Redshift database to insert the values in `df` Returns ------- None: None """ try: import pandas except ModuleNotFoundError: raise ModuleNotFoundError(MISSING_MODULE_ERROR_MSG.format(module="pandas")) if not self.__is_valid_table(table): raise InterfaceError("Invalid table name passed to write_dataframe: {}".format(table)) sanitized_table_name: str = self.__sanitize_str(table) arrays: "numpy.ndarray" = df.values placeholder: str = ", ".join(["%s"] * len(arrays[0])) sql: str = "insert into {table} values ({placeholder})".format( table=sanitized_table_name, placeholder=placeholder ) if len(arrays) == 1: self.execute(sql, arrays[0]) elif len(arrays) > 1: self.executemany(sql, arrays) def fetch_numpy_array(self: "Cursor", num: typing.Optional[int] = None) -> "numpy.ndarray": """ Fetches a user defined number of rows of a query result as a :class:`numpy.ndarray`. Parameters ---------- num : int The number of rows to retrieve from the result set. Returns ------- A `numpy.ndarray` containing the results of a query executed::class:`numpy.ndarray` """ try: import numpy except ModuleNotFoundError: raise ModuleNotFoundError(MISSING_MODULE_ERROR_MSG.format(module="numpy")) if num: fetched: typing.Tuple = self.fetchmany(num) else: fetched = self.fetchall() return numpy.array(fetched) def get_procedures( self: "Cursor", catalog: typing.Optional[str] = None, schema_pattern: typing.Optional[str] = None, procedure_name_pattern: typing.Optional[str] = None, ) -> tuple: sql: str = ( "SELECT current_database() AS PROCEDURE_CAT, n.nspname AS PROCEDURE_SCHEM, p.proname AS PROCEDURE_NAME, " "NULL, NULL, NULL, d.description AS REMARKS, " " CASE p.prokind " " WHEN 'f' THEN 2 " " WHEN 'p' THEN 1 " " ELSE 0 " " END AS PROCEDURE_TYPE, " " p.proname || '_' || p.prooid AS SPECIFIC_NAME " " FROM pg_catalog.pg_namespace n, pg_catalog.pg_proc_info p " " LEFT JOIN pg_catalog.pg_description d ON (p.prooid=d.objoid) " " LEFT JOIN pg_catalog.pg_class c ON (d.classoid=c.oid AND c.relname='pg_proc') " " LEFT JOIN pg_catalog.pg_namespace pn ON (c.relnamespace=pn.oid AND pn.nspname='pg_catalog') " " WHERE p.pronamespace=n.oid " ) query_args: typing.List[str] = [] if schema_pattern is not None and schema_pattern != "": sql += " AND n.nspname LIKE ?" query_args.append(self.__sanitize_str(schema_pattern)) else: sql += "and pg_function_is_visible(p.prooid)" if procedure_name_pattern is not None and procedure_name_pattern != "": sql += " AND p.proname LIKE ?" query_args.append(self.__sanitize_str(procedure_name_pattern)) sql += " ORDER BY PROCEDURE_SCHEM, PROCEDURE_NAME, p.prooid::text " if len(query_args) > 0: # temporarily use qmark paramstyle temp = self.paramstyle self.paramstyle = "qmark" try: self.execute(sql, tuple(query_args)) except: raise finally: # reset the original value of paramstyle self.paramstyle = temp else: self.execute(sql) procedures: tuple = self.fetchall() return procedures def _get_catalog_filter_conditions( self: "Cursor", catalog: typing.Optional[str], api_supported_only_for_connected_database: bool, database_col_name: typing.Optional[str], ) -> str: if self._c is None: raise InterfaceError("connection is closed") catalog_filter: str = "" if catalog is not None and catalog != "": if self._c.is_single_database_metadata is True or api_supported_only_for_connected_database is True: catalog_filter += " AND current_database() = {catalog}".format(catalog=self.__escape_quotes(catalog)) else: if database_col_name is None or database_col_name == "": database_col_name = "database_name" catalog_filter += " AND {col_name} = {catalog}".format( col_name=self.__sanitize_str(database_col_name), catalog=self.__escape_quotes(catalog) ) return catalog_filter def get_schemas( self: "Cursor", catalog: typing.Optional[str] = None, schema_pattern: typing.Optional[str] = None ) -> tuple: if self._c is None: raise InterfaceError("connection is closed") query_args: typing.List[str] = [] sql: str = "" if self._c.is_single_database_metadata is True: sql = ( "SELECT nspname AS TABLE_SCHEM, NULL AS TABLE_CATALOG FROM pg_catalog.pg_namespace " " WHERE nspname <> 'pg_toast' AND (nspname !~ '^pg_temp_' " " OR nspname = (pg_catalog.current_schemas(true))[1]) AND (nspname !~ '^pg_toast_temp_' " " OR nspname = replace((pg_catalog.current_schemas(true))[1], 'pg_temp_', 'pg_toast_temp_')) " ) sql += self._get_catalog_filter_conditions(catalog, True, None) if schema_pattern is not None and schema_pattern != "": sql += " AND nspname LIKE ?" query_args.append(self.__sanitize_str(schema_pattern)) # if self._c.get_hide_unprivileged_objects(): # TODO: not implemented # sql += " AND has_schema_privilege(nspname, 'USAGE, CREATE')" sql += " ORDER BY TABLE_SCHEM" else: sql = ( "SELECT CAST(schema_name AS varchar(124)) AS TABLE_SCHEM, " " CAST(database_name AS varchar(124)) AS TABLE_CATALOG " " FROM PG_CATALOG.SVV_ALL_SCHEMAS " " WHERE TRUE " ) sql += self._get_catalog_filter_conditions(catalog, False, None) if schema_pattern is not None and schema_pattern != "": sql += " AND schema_name LIKE ?" query_args.append(self.__sanitize_str(schema_pattern)) sql += " ORDER BY TABLE_CATALOG, TABLE_SCHEM" if len(query_args) == 1: # temporarily use qmark paramstyle temp = self.paramstyle self.paramstyle = "qmark" try: self.execute(sql, tuple(query_args)) except: raise finally: self.paramstyle = temp else: self.execute(sql) schemas: tuple = self.fetchall() return schemas def get_primary_keys( self: "Cursor", catalog: typing.Optional[str] = None, schema: typing.Optional[str] = None, table: typing.Optional[str] = None, ) -> tuple: sql: str = ( "SELECT " "current_database() AS TABLE_CAT, " "n.nspname AS TABLE_SCHEM, " "ct.relname AS TABLE_NAME, " "a.attname AS COLUMN_NAME, " "a.attnum AS KEY_SEQ, " "ci.relname AS PK_NAME " "FROM " "pg_catalog.pg_namespace n, " "pg_catalog.pg_class ct, " "pg_catalog.pg_class ci, " "pg_catalog.pg_attribute a, " "pg_catalog.pg_index i " "WHERE " "ct.oid=i.indrelid AND " "ci.oid=i.indexrelid AND " "a.attrelid=ci.oid AND " "i.indisprimary AND " "ct.relnamespace = n.oid " ) query_args: typing.List[str] = [] if schema is not None and schema != "": sql += " AND n.nspname = ?" query_args.append(self.__sanitize_str(schema)) if table is not None and table != "": sql += " AND ct.relname = ?" query_args.append(self.__sanitize_str(table)) sql += " ORDER BY table_name, pk_name, key_seq" if len(query_args) > 0: # temporarily use qmark paramstyle temp = self.paramstyle self.paramstyle = "qmark" try: self.execute(sql, tuple(query_args)) except: raise finally: self.paramstyle = temp else: self.execute(sql) keys: tuple = self.fetchall() return keys def get_catalogs(self: "Cursor") -> typing.Tuple: """ Redshift does not support multiple catalogs from a single connection, so to reduce confusion we only return the current catalog. Returns ------- A tuple containing the name of the current catalog: tuple """ if self._c is None: raise InterfaceError("connection is closed") sql: str = "" if self._c.is_single_database_metadata is True: sql = "select current_database as TABLE_CAT FROM current_database()" else: # Datasharing/federation support enable, so get databases using the new view. sql = "SELECT CAST(database_name AS varchar(124)) AS TABLE_CAT FROM PG_CATALOG.SVV_REDSHIFT_DATABASES " sql += " ORDER BY TABLE_CAT" self.execute(sql) catalogs: typing.Tuple = self.fetchall() return catalogs def get_tables( self: "Cursor", catalog: typing.Optional[str] = None, schema_pattern: typing.Optional[str] = None, table_name_pattern: typing.Optional[str] = None, types: list = [], ) -> tuple: """ Returns the unique public tables which are user-defined within the system. Parameters ---------- catalog : Optional[str] The name of the catalog schema_pattern : Optional[str] A valid pattern for desired schemas table_name_pattern : Optional[str] A valid pattern for desired table names types : Optional[list[str]] A list of `str` containing table types. By default table types is not used as a filter. Returns ------- A tuple containing unique public tables which are user-defined within the system: tuple """ if self._c is None: raise InterfaceError("connection is closed") sql: str = "" sql_args: typing.Tuple[str, ...] = tuple() schema_pattern_type: str = self.__schema_pattern_match(schema_pattern) if schema_pattern_type == "LOCAL_SCHEMA_QUERY": sql, sql_args = self.__build_local_schema_tables_query(catalog, schema_pattern, table_name_pattern, types) elif schema_pattern_type == "NO_SCHEMA_UNIVERSAL_QUERY": if self._c.is_single_database_metadata is True: sql, sql_args = self.__build_universal_schema_tables_query( catalog, schema_pattern, table_name_pattern, types ) else: sql, sql_args = self.__build_universal_all_schema_tables_query( catalog, schema_pattern, table_name_pattern, types ) elif schema_pattern_type == "EXTERNAL_SCHEMA_QUERY": sql, sql_args = self.__build_external_schema_tables_query( catalog, schema_pattern, table_name_pattern, types ) if len(sql_args) > 0: temp = self.paramstyle self.paramstyle = "qmark" try: self.execute(sql, sql_args) except: raise finally: self.paramstyle = temp else: self.execute(sql) tables: tuple = self.fetchall() return tables def __build_local_schema_tables_query( self: "Cursor", catalog: typing.Optional[str], schema_pattern: typing.Optional[str], table_name_pattern: typing.Optional[str], types: list, ) -> typing.Tuple[str, typing.Tuple[str, ...]]: sql: str = ( "SELECT CAST(current_database() AS VARCHAR(124)) AS TABLE_CAT, n.nspname AS TABLE_SCHEM, c.relname AS TABLE_NAME, " " CASE n.nspname ~ '^pg_' OR n.nspname = 'information_schema' " " WHEN true THEN CASE " " WHEN n.nspname = 'pg_catalog' OR n.nspname = 'information_schema' THEN CASE c.relkind " " WHEN 'r' THEN 'SYSTEM TABLE' " " WHEN 'v' THEN 'SYSTEM VIEW' " " WHEN 'i' THEN 'SYSTEM INDEX' " " ELSE NULL " " END " " WHEN n.nspname = 'pg_toast' THEN CASE c.relkind " " WHEN 'r' THEN 'SYSTEM TOAST TABLE' " " WHEN 'i' THEN 'SYSTEM TOAST INDEX' " " ELSE NULL " " END " " ELSE CASE c.relkind " " WHEN 'r' THEN 'TEMPORARY TABLE' " " WHEN 'p' THEN 'TEMPORARY TABLE' " " WHEN 'i' THEN 'TEMPORARY INDEX' " " WHEN 'S' THEN 'TEMPORARY SEQUENCE' " " WHEN 'v' THEN 'TEMPORARY VIEW' " " ELSE NULL " " END " " END " " WHEN false THEN CASE c.relkind " " WHEN 'r' THEN 'TABLE' " " WHEN 'p' THEN 'PARTITIONED TABLE' " " WHEN 'i' THEN 'INDEX' " " WHEN 'S' THEN 'SEQUENCE' " " WHEN 'v' THEN 'VIEW' " " WHEN 'c' THEN 'TYPE' " " WHEN 'f' THEN 'FOREIGN TABLE' " " WHEN 'm' THEN 'MATERIALIZED VIEW' " " ELSE NULL " " END " " ELSE NULL " " END " " AS TABLE_TYPE, d.description AS REMARKS, " " '' as TYPE_CAT, '' as TYPE_SCHEM, '' as TYPE_NAME, " "'' AS SELF_REFERENCING_COL_NAME, '' AS REF_GENERATION " " FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c " " LEFT JOIN pg_catalog.pg_description d ON (c.oid = d.objoid AND d.objsubid = 0) " " LEFT JOIN pg_catalog.pg_class dc ON (d.classoid=dc.oid AND dc.relname='pg_class') " " LEFT JOIN pg_catalog.pg_namespace dn ON (dn.oid=dc.relnamespace AND dn.nspname='pg_catalog') " " WHERE c.relnamespace = n.oid " ) filter_clause, filter_args = self.__get_table_filter_clause( catalog, schema_pattern, table_name_pattern, types, "LOCAL_SCHEMA_QUERY", True, None ) orderby: str = " ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME " return sql + filter_clause + orderby, filter_args def __get_table_filter_clause( self: "Cursor", catalog: typing.Optional[str], schema_pattern: typing.Optional[str], table_name_pattern: typing.Optional[str], types: typing.List[str], schema_pattern_type: str, api_supported_only_for_connected_database: bool, database_col_name: typing.Optional[str], ) -> typing.Tuple[str, typing.Tuple[str, ...]]: filter_clause: str = "" use_schemas: str = "SCHEMAS" filter_clause += self._get_catalog_filter_conditions( catalog, api_supported_only_for_connected_database, database_col_name ) query_args: typing.List[str] = [] if schema_pattern is not None and schema_pattern != "": filter_clause += " AND TABLE_SCHEM LIKE ?" query_args.append(self.__sanitize_str(schema_pattern)) if table_name_pattern is not None and table_name_pattern != "": filter_clause += " AND TABLE_NAME LIKE ?" query_args.append(self.__sanitize_str(table_name_pattern)) if len(types) > 0: if schema_pattern_type == "LOCAL_SCHEMA_QUERY": filter_clause += " AND (false " orclause: str = "" for type in types: if type not in table_type_clauses.keys(): raise InterfaceError( "Invalid type: {} provided. types may only contain: {}".format( type, table_type_clauses.keys() ) ) clauses: typing.Optional[typing.Dict[str, str]] = table_type_clauses[type] if clauses is not None: cluase: str = clauses[use_schemas] orclause += " OR ( {cluase} ) ".format(cluase=cluase) filter_clause += orclause + ") " elif schema_pattern_type == "NO_SCHEMA_UNIVERSAL_QUERY" or schema_pattern_type == "EXTERNAL_SCHEMA_QUERY": filter_clause += " AND TABLE_TYPE IN ( " length = len(types) for type in types: if type not in table_type_clauses.keys(): raise InterfaceError( "Invalid type: {} provided. types may only contain: {}".format( type, table_type_clauses.keys() ) ) filter_clause += "?" query_args.append(type) length -= 1 if length > 0: filter_clause += ", " filter_clause += ") " return filter_clause, tuple(query_args) def __build_universal_schema_tables_query( self: "Cursor", catalog: typing.Optional[str], schema_pattern: typing.Optional[str], table_name_pattern: typing.Optional[str], types: list, ) -> typing.Tuple[str, typing.Tuple[str, ...]]: sql: str = ( "SELECT * FROM (SELECT CAST(current_database() AS VARCHAR(124)) AS TABLE_CAT," " table_schema AS TABLE_SCHEM," " table_name AS TABLE_NAME," " CAST(" " CASE table_type" " WHEN 'BASE TABLE' THEN CASE" " WHEN table_schema = 'pg_catalog' OR table_schema = 'information_schema' THEN 'SYSTEM TABLE'" " WHEN table_schema = 'pg_toast' THEN 'SYSTEM TOAST TABLE'" " WHEN table_schema ~ '^pg_' AND table_schema != 'pg_toast' THEN 'TEMPORARY TABLE'" " ELSE 'TABLE'" " END" " WHEN 'VIEW' THEN CASE" " WHEN table_schema = 'pg_catalog' OR table_schema = 'information_schema' THEN 'SYSTEM VIEW'" " WHEN table_schema = 'pg_toast' THEN NULL" " WHEN table_schema ~ '^pg_' AND table_schema != 'pg_toast' THEN 'TEMPORARY VIEW'" " ELSE 'VIEW'" " END" " WHEN 'EXTERNAL TABLE' THEN 'EXTERNAL TABLE'" " END" " AS VARCHAR(124)) AS TABLE_TYPE," " REMARKS," " '' as TYPE_CAT," " '' as TYPE_SCHEM," " '' as TYPE_NAME, " " '' AS SELF_REFERENCING_COL_NAME," " '' AS REF_GENERATION " " FROM svv_tables)" " WHERE true " ) filter_clause, filter_args = self.__get_table_filter_clause( catalog, schema_pattern, table_name_pattern, types, "NO_SCHEMA_UNIVERSAL_QUERY", True, None ) orderby: str = " ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME " sql += filter_clause + orderby return sql, filter_args def __build_universal_all_schema_tables_query( self: "Cursor", catalog: typing.Optional[str], schema_pattern: typing.Optional[str], table_name_pattern: typing.Optional[str], types: list, ) -> typing.Tuple[str, typing.Tuple[str, ...]]: sql: str = ( "SELECT * FROM (SELECT CAST(DATABASE_NAME AS VARCHAR(124)) AS TABLE_CAT," " SCHEMA_NAME AS TABLE_SCHEM," " TABLE_NAME AS TABLE_NAME," " CAST(" " CASE " " WHEN SCHEMA_NAME='information_schema' " " AND TABLE_TYPE='TABLE' THEN 'SYSTEM TABLE' " " WHEN SCHEMA_NAME='information_schema' " " AND TABLE_TYPE='VIEW' THEN 'SYSTEM VIEW' " " ELSE TABLE_TYPE " " END " " AS VARCHAR(124)) AS TABLE_TYPE," " REMARKS," " '' as TYPE_CAT," " '' as TYPE_SCHEM," " '' as TYPE_NAME, " " '' AS SELF_REFERENCING_COL_NAME," " '' AS REF_GENERATION " " FROM PG_CATALOG.SVV_ALL_TABLES)" " WHERE true " ) filter_clause, filter_args = self.__get_table_filter_clause( catalog, schema_pattern, table_name_pattern, types, "NO_SCHEMA_UNIVERSAL_QUERY", False, "TABLE_CAT" ) orderby: str = " ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME " sql += filter_clause sql += orderby return sql, filter_args def __build_external_schema_tables_query( self: "Cursor", catalog: typing.Optional[str], schema_pattern: typing.Optional[str], table_name_pattern: typing.Optional[str], types: list, ) -> typing.Tuple[str, typing.Tuple[str, ...]]: sql: str = ( "SELECT * FROM (SELECT CAST(current_database() AS VARCHAR(124)) AS TABLE_CAT," " schemaname AS table_schem," " tablename AS TABLE_NAME," " 'EXTERNAL TABLE' AS TABLE_TYPE," " NULL AS REMARKS," " '' as TYPE_CAT," " '' as TYPE_SCHEM," " '' as TYPE_NAME, " " '' AS SELF_REFERENCING_COL_NAME," " '' AS REF_GENERATION " " FROM svv_external_tables)" " WHERE true " ) filter_clause, filter_args = self.__get_table_filter_clause( catalog, schema_pattern, table_name_pattern, types, "EXTERNAL_SCHEMA_QUERY", True, None ) orderby: str = " ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME " sql += filter_clause + orderby return sql, filter_args def get_columns( self: "Cursor", catalog: typing.Optional[str] = None, schema_pattern: typing.Optional[str] = None, tablename_pattern: typing.Optional[str] = None, columnname_pattern: typing.Optional[str] = None, ) -> tuple: """ Returns a list of all columns in a specific table in Amazon Redshift database. Parameters ---------- catalog : Optional[str] The name of the catalog schema_pattern : Optional[str] A valid pattern for desired schemas table_name_pattern : Optional[str] A valid pattern for desired table names column_name_pattern : Optional[str] A valid pattern for desired column names Returns ------- A tuple containing all columns in a specific table in Amazon Redshift database: tuple """ if self._c is None: raise InterfaceError("connection is closed") sql: str = "" schema_pattern_type: str = self.__schema_pattern_match(schema_pattern) if schema_pattern_type == "LOCAL_SCHEMA_QUERY": sql = self.__build_local_schema_columns_query( catalog, schema_pattern, tablename_pattern, columnname_pattern ) elif schema_pattern_type == "NO_SCHEMA_UNIVERSAL_QUERY": if self._c.is_single_database_metadata is True: sql = self.__build_universal_schema_columns_query( catalog, schema_pattern, tablename_pattern, columnname_pattern ) else: sql = self.__build_universal_all_schema_columns_query( catalog, schema_pattern, tablename_pattern, columnname_pattern ) elif schema_pattern_type == "EXTERNAL_SCHEMA_QUERY": sql = self.__build_external_schema_columns_query( catalog, schema_pattern, tablename_pattern, columnname_pattern ) self.execute(sql) columns: tuple = self.fetchall() return columns def __build_local_schema_columns_query( self: "Cursor", catalog: typing.Optional[str], schema_pattern: typing.Optional[str], tablename_pattern: typing.Optional[str], columnname_pattern: typing.Optional[str], ) -> str: sql: str = ( "SELECT * FROM ( " "SELECT current_database() AS TABLE_CAT, " "n.nspname AS TABLE_SCHEM, " "c.relname as TABLE_NAME , " "a.attname as COLUMN_NAME, " "CAST(case typname " "when 'text' THEN 12 " "when 'bit' THEN -7 " "when 'bool' THEN -7 " "when 'boolean' THEN -7 " "when 'varchar' THEN 12 " "when 'character varying' THEN 12 " "when 'char' THEN 1 " "when '\"char\"' THEN 1 " "when 'character' THEN 1 " "when 'nchar' THEN 12 " "when 'bpchar' THEN 1 " "when 'nvarchar' THEN 12 " "when 'date' THEN 91 " "when 'timestamp' THEN 93 " "when 'timestamp without time zone' THEN 93 " "when 'smallint' THEN 5 " "when 'int2' THEN 5 " "when 'integer' THEN 4 " "when 'int' THEN 4 " "when 'int4' THEN 4 " "when 'bigint' THEN -5 " "when 'int8' THEN -5 " "when 'decimal' THEN 3 " "when 'real' THEN 7 " "when 'float4' THEN 7 " "when 'double precision' THEN 8 " "when 'float8' THEN 8 " "when 'float' THEN 6 " "when 'numeric' THEN 2 " "when '_float4' THEN 2003 " "when 'timestamptz' THEN 2014 " "when 'timestamp with time zone' THEN 2014 " "when '_aclitem' THEN 2003 " "when '_text' THEN 2003 " "when 'bytea' THEN -2 " "when 'oid' THEN -5 " "when 'name' THEN 12 " "when '_int4' THEN 2003 " "when '_int2' THEN 2003 " "when 'ARRAY' THEN 2003 " "when 'geometry' THEN -4 " "when 'omni' THEN -16 " "else 1111 END as SMALLINT) AS DATA_TYPE, " "t.typname as TYPE_NAME, " "case typname " "when 'int4' THEN 10 " "when 'bit' THEN 1 " "when 'bool' THEN 1 " "when 'varchar' THEN atttypmod -4 " "when 'character varying' THEN atttypmod -4 " "when 'char' THEN atttypmod -4 " "when 'character' THEN atttypmod -4 " "when 'nchar' THEN atttypmod -4 " "when 'bpchar' THEN atttypmod -4 " "when 'nvarchar' THEN atttypmod -4 " "when 'date' THEN 13 " "when 'timestamp' THEN 29 " "when 'smallint' THEN 5 " "when 'int2' THEN 5 " "when 'integer' THEN 10 " "when 'int' THEN 10 " "when 'int4' THEN 10 " "when 'bigint' THEN 19 " "when 'int8' THEN 19 " "when 'decimal' then (atttypmod - 4) >> 16 " "when 'real' THEN 8 " "when 'float4' THEN 8 " "when 'double precision' THEN 17 " "when 'float8' THEN 17 " "when 'float' THEN 17 " "when 'numeric' THEN (atttypmod - 4) >> 16 " "when '_float4' THEN 8 " "when 'timestamptz' THEN 35 " "when 'oid' THEN 10 " "when '_int4' THEN 10 " "when '_int2' THEN 5 " "when 'geometry' THEN NULL " "when 'omni' THEN NULL " "else 2147483647 end as COLUMN_SIZE , " "null as BUFFER_LENGTH , " "case typname " "when 'float4' then 8 " "when 'float8' then 17 " "when 'numeric' then (atttypmod - 4) & 65535 " "when 'timestamp' then 6 " "when 'geometry' then NULL " "when 'omni' then NULL " "else 0 end as DECIMAL_DIGITS, " "10 AS NUM_PREC_RADIX , " "case a.attnotnull OR (t.typtype = 'd' AND t.typnotnull) " "when 'false' then 1 " "when NULL then 2 " "else 0 end AS NULLABLE , " "dsc.description as REMARKS , " "pg_catalog.pg_get_expr(def.adbin, def.adrelid) AS COLUMN_DEF, " "CAST(case typname " "when 'text' THEN 12 " "when 'bit' THEN -7 " "when 'bool' THEN -7 " "when 'boolean' THEN -7 " "when 'varchar' THEN 12 " "when 'character varying' THEN 12 " "when '\"char\"' THEN 1 " "when 'char' THEN 1 " "when 'character' THEN 1 " "when 'nchar' THEN 1 " "when 'bpchar' THEN 1 " "when 'nvarchar' THEN 12 " "when 'date' THEN 91 " "when 'timestamp' THEN 93 " "when 'timestamp without time zone' THEN 93 " "when 'smallint' THEN 5 " "when 'int2' THEN 5 " "when 'integer' THEN 4 " "when 'int' THEN 4 " "when 'int4' THEN 4 " "when 'bigint' THEN -5 " "when 'int8' THEN -5 " "when 'decimal' THEN 3 " "when 'real' THEN 7 " "when 'float4' THEN 7 " "when 'double precision' THEN 8 " "when 'float8' THEN 8 " "when 'float' THEN 6 " "when 'numeric' THEN 2 " "when '_float4' THEN 2003 " "when 'timestamptz' THEN 2014 " "when 'timestamp with time zone' THEN 2014 " "when '_aclitem' THEN 2003 " "when '_text' THEN 2003 " "when 'bytea' THEN -2 " "when 'oid' THEN -5 " "when 'name' THEN 12 " "when '_int4' THEN 2003 " "when '_int2' THEN 2003 " "when 'ARRAY' THEN 2003 " "when 'geometry' THEN -4 " "when 'omni' THEN -16 " "else 1111 END as SMALLINT) AS SQL_DATA_TYPE, " "CAST(NULL AS SMALLINT) as SQL_DATETIME_SUB , " "case typname " "when 'int4' THEN 10 " "when 'bit' THEN 1 " "when 'bool' THEN 1 " "when 'varchar' THEN atttypmod -4 " "when 'character varying' THEN atttypmod -4 " "when 'char' THEN atttypmod -4 " "when 'character' THEN atttypmod -4 " "when 'nchar' THEN atttypmod -4 " "when 'bpchar' THEN atttypmod -4 " "when 'nvarchar' THEN atttypmod -4 " "when 'date' THEN 13 " "when 'timestamp' THEN 29 " "when 'smallint' THEN 5 " "when 'int2' THEN 5 " "when 'integer' THEN 10 " "when 'int' THEN 10 " "when 'int4' THEN 10 " "when 'bigint' THEN 19 " "when 'int8' THEN 19 " "when 'decimal' then ((atttypmod - 4) >> 16) & 65535 " "when 'real' THEN 8 " "when 'float4' THEN 8 " "when 'double precision' THEN 17 " "when 'float8' THEN 17 " "when 'float' THEN 17 " "when 'numeric' THEN ((atttypmod - 4) >> 16) & 65535 " "when '_float4' THEN 8 " "when 'timestamptz' THEN 35 " "when 'oid' THEN 10 " "when '_int4' THEN 10 " "when '_int2' THEN 5 " "when 'geometry' THEN NULL " "when 'omni' THEN NULL " "else 2147483647 end as CHAR_OCTET_LENGTH , " "a.attnum AS ORDINAL_POSITION, " "case a.attnotnull OR (t.typtype = 'd' AND t.typnotnull) " "when 'false' then 'YES' " "when NULL then '' " "else 'NO' end AS IS_NULLABLE, " "null as SCOPE_CATALOG , " "null as SCOPE_SCHEMA , " "null as SCOPE_TABLE, " "t.typbasetype AS SOURCE_DATA_TYPE , " "CASE WHEN left(pg_catalog.pg_get_expr(def.adbin, def.adrelid), 16) = 'default_identity' THEN 'YES' " "ELSE 'NO' END AS IS_AUTOINCREMENT, " "IS_AUTOINCREMENT AS IS_GENERATEDCOLUMN " "FROM pg_catalog.pg_namespace n JOIN pg_catalog.pg_class c ON (c.relnamespace = n.oid) " "JOIN pg_catalog.pg_attribute a ON (a.attrelid=c.oid) " "JOIN pg_catalog.pg_type t ON (a.atttypid = t.oid) " "LEFT JOIN pg_catalog.pg_attrdef def ON (a.attrelid=def.adrelid AND a.attnum = def.adnum) " "LEFT JOIN pg_catalog.pg_description dsc ON (c.oid=dsc.objoid AND a.attnum = dsc.objsubid) " "LEFT JOIN pg_catalog.pg_class dc ON (dc.oid=dsc.classoid AND dc.relname='pg_class') " "LEFT JOIN pg_catalog.pg_namespace dn ON (dc.relnamespace=dn.oid AND dn.nspname='pg_catalog') " "WHERE a.attnum > 0 AND NOT a.attisdropped " ) sql += self._get_catalog_filter_conditions(catalog, True, None) if schema_pattern is not None and schema_pattern != "": sql += " AND n.nspname LIKE {schema}".format(schema=self.__escape_quotes(schema_pattern)) if tablename_pattern is not None and tablename_pattern != "": sql += " AND c.relname LIKE {table}".format(table=self.__escape_quotes(tablename_pattern)) if columnname_pattern is not None and columnname_pattern != "": sql += " AND attname LIKE {column}".format(column=self.__escape_quotes(columnname_pattern)) sql += " ORDER BY TABLE_SCHEM,c.relname,attnum ) " # This part uses redshift method PG_GET_LATE_BINDING_VIEW_COLS() to # get the column list for late binding view. sql += ( " UNION ALL " "SELECT current_database()::VARCHAR(128) AS TABLE_CAT, " "schemaname::varchar(128) AS table_schem, " "tablename::varchar(128) AS table_name, " "columnname::varchar(128) AS column_name, " "CAST(CASE columntype_rep " "WHEN 'text' THEN 12 " "WHEN 'bit' THEN -7 " "WHEN 'bool' THEN -7 " "WHEN 'boolean' THEN -7 " "WHEN 'varchar' THEN 12 " "WHEN 'character varying' THEN 12 " "WHEN 'char' THEN 1 " "WHEN 'character' THEN 1 " "WHEN 'nchar' THEN 1 " "WHEN 'bpchar' THEN 1 " "WHEN 'nvarchar' THEN 12 " "WHEN '\"char\"' THEN 1 " "WHEN 'date' THEN 91 " "WHEN 'timestamp' THEN 93 " "WHEN 'timestamp without time zone' THEN 93 " "WHEN 'timestamp with time zone' THEN 2014 " "WHEN 'smallint' THEN 5 " "WHEN 'int2' THEN 5 " "WHEN 'integer' THEN 4 " "WHEN 'int' THEN 4 " "WHEN 'int4' THEN 4 " "WHEN 'bigint' THEN -5 " "WHEN 'int8' THEN -5 " "WHEN 'decimal' THEN 3 " "WHEN 'real' THEN 7 " "WHEN 'float4' THEN 7 " "WHEN 'double precision' THEN 8 " "WHEN 'float8' THEN 8 " "WHEN 'float' THEN 6 " "WHEN 'numeric' THEN 2 " "WHEN 'timestamptz' THEN 2014 " "WHEN 'bytea' THEN -2 " "WHEN 'oid' THEN -5 " "WHEN 'name' THEN 12 " "WHEN 'ARRAY' THEN 2003 " "WHEN 'geometry' THEN -4 " "WHEN 'omni' THEN -16 " "ELSE 1111 END AS SMALLINT) AS DATA_TYPE, " "COALESCE(NULL,CASE columntype WHEN 'boolean' THEN 'bool' " "WHEN 'character varying' THEN 'varchar' " "WHEN '\"char\"' THEN 'char' " "WHEN 'smallint' THEN 'int2' " "WHEN 'integer' THEN 'int4'" "WHEN 'bigint' THEN 'int8' " "WHEN 'real' THEN 'float4' " "WHEN 'double precision' THEN 'float8' " "WHEN 'timestamp without time zone' THEN 'timestamp' " "WHEN 'timestamp with time zone' THEN 'timestamptz' " "ELSE columntype END) AS TYPE_NAME, " "CASE columntype_rep " "WHEN 'int4' THEN 10 " "WHEN 'bit' THEN 1 " "WHEN 'bool' THEN 1" "WHEN 'boolean' THEN 1" "WHEN 'varchar' THEN regexp_substr (columntype,'[0-9]+',7)::INTEGER " "WHEN 'character varying' THEN regexp_substr (columntype,'[0-9]+',7)::INTEGER " "WHEN 'char' THEN regexp_substr (columntype,'[0-9]+',4)::INTEGER " "WHEN 'character' THEN regexp_substr (columntype,'[0-9]+',4)::INTEGER " "WHEN 'nchar' THEN regexp_substr (columntype,'[0-9]+',7)::INTEGER " "WHEN 'bpchar' THEN regexp_substr (columntype,'[0-9]+',7)::INTEGER " "WHEN 'nvarchar' THEN regexp_substr (columntype,'[0-9]+',7)::INTEGER " "WHEN 'date' THEN 13 " "WHEN 'timestamp' THEN 29 " "WHEN 'timestamp without time zone' THEN 29 " "WHEN 'smallint' THEN 5 " "WHEN 'int2' THEN 5 " "WHEN 'integer' THEN 10 " "WHEN 'int' THEN 10 " "WHEN 'int4' THEN 10 " "WHEN 'bigint' THEN 19 " "WHEN 'int8' THEN 19 " "WHEN 'decimal' THEN regexp_substr (columntype,'[0-9]+',7)::INTEGER " "WHEN 'real' THEN 8 " "WHEN 'float4' THEN 8 " "WHEN 'double precision' THEN 17 " "WHEN 'float8' THEN 17 " "WHEN 'float' THEN 17" "WHEN 'numeric' THEN regexp_substr (columntype,'[0-9]+',7)::INTEGER " "WHEN '_float4' THEN 8 " "WHEN 'timestamptz' THEN 35 " "WHEN 'timestamp with time zone' THEN 35 " "WHEN 'oid' THEN 10 " "WHEN '_int4' THEN 10 " "WHEN '_int2' THEN 5 " "WHEN 'geometry' THEN NULL " "WHEN 'omni' THEN NULL " "ELSE 2147483647 END AS COLUMN_SIZE, " "NULL AS BUFFER_LENGTH, " "CASE columntype " "WHEN 'real' THEN 8 " "WHEN 'float4' THEN 8 " "WHEN 'double precision' THEN 17 " "WHEN 'float8' THEN 17 " "WHEN 'timestamp' THEN 6 " "WHEN 'timestamp without time zone' THEN 6 " "WHEN 'geometry' THEN NULL " "WHEN 'omni' THEN NULL " "ELSE 0 END AS DECIMAL_DIGITS, 10 AS NUM_PREC_RADIX, " "NULL AS NULLABLE, NULL AS REMARKS, NULL AS COLUMN_DEF, " "CAST(CASE columntype_rep " "WHEN 'text' THEN 12 " "WHEN 'bit' THEN -7 " "WHEN 'bool' THEN -7 " "WHEN 'boolean' THEN -7 " "WHEN 'varchar' THEN 12 " "WHEN 'character varying' THEN 12 " "WHEN 'char' THEN 1 " "WHEN 'character' THEN 1 " "WHEN 'nchar' THEN 12 " "WHEN 'bpchar' THEN 1 " "WHEN 'nvarchar' THEN 12 " "WHEN '\"char\"' THEN 1 " "WHEN 'date' THEN 91 " "WHEN 'timestamp' THEN 93 " "WHEN 'timestamp without time zone' THEN 93 " "WHEN 'timestamp with time zone' THEN 2014 " "WHEN 'smallint' THEN 5 " "WHEN 'int2' THEN 5 " "WHEN 'integer' THEN 4 " "WHEN 'int' THEN 4 " "WHEN 'int4' THEN 4 " "WHEN 'bigint' THEN -5 " "WHEN 'int8' THEN -5 " "WHEN 'decimal' THEN 3 " "WHEN 'real' THEN 7 " "WHEN 'float4' THEN 7 " "WHEN 'double precision' THEN 8 " "WHEN 'float8' THEN 8 " "WHEN 'float' THEN 6 " "WHEN 'numeric' THEN 2 " "WHEN 'timestamptz' THEN 2014 " "WHEN 'bytea' THEN -2 " "WHEN 'oid' THEN -5 " "WHEN 'name' THEN 12 " "WHEN 'ARRAY' THEN 2003 " "WHEN 'geometry' THEN -4 " "WHEN 'omni' THEN -4 " "ELSE 1111 END AS SMALLINT) AS SQL_DATA_TYPE, " "CAST(NULL AS SMALLINT) AS SQL_DATETIME_SUB, CASE " "WHEN LEFT (columntype,7) = 'varchar' THEN regexp_substr (columntype,'[0-9]+',7)::INTEGER " "WHEN LEFT (columntype,4) = 'char' THEN regexp_substr (columntype,'[0-9]+',4)::INTEGER " "WHEN columntype = 'string' THEN 16383 ELSE NULL " "END AS CHAR_OCTET_LENGTH, columnnum AS ORDINAL_POSITION, " "NULL AS IS_NULLABLE, NULL AS SCOPE_CATALOG, NULL AS SCOPE_SCHEMA, " "NULL AS SCOPE_TABLE, NULL AS SOURCE_DATA_TYPE, 'NO' AS IS_AUTOINCREMENT, " "'NO' as IS_GENERATEDCOLUMN " "FROM (select lbv_cols.schemaname, " "lbv_cols.tablename, lbv_cols.columnname," "REGEXP_REPLACE(REGEXP_REPLACE(lbv_cols.columntype,'\\\\(.*\\\\)'),'^_.+','ARRAY') as columntype_rep," "columntype, " "lbv_cols.columnnum " "from pg_get_late_binding_view_cols() lbv_cols( " "schemaname name, tablename name, columnname name, " "columntype text, columnnum int)) lbv_columns " " WHERE true " ) if schema_pattern is not None and schema_pattern != "": sql += " AND schemaname LIKE {schema}".format(schema=self.__escape_quotes(schema_pattern)) if tablename_pattern is not None and tablename_pattern != "": sql += " AND tablename LIKE {table}".format(table=self.__escape_quotes(tablename_pattern)) if columnname_pattern is not None and columnname_pattern != "": sql += " AND columnname LIKE {column}".format(column=self.__escape_quotes(columnname_pattern)) return sql def __build_universal_schema_columns_query_filters( self: "Cursor", schema_pattern: typing.Optional[str], tablename_pattern: typing.Optional[str], columnname_pattern: typing.Optional[str], ) -> str: filter_clause: str = "" if schema_pattern is not None and schema_pattern != "": filter_clause += " AND schema_name LIKE {schema}".format(schema=self.__escape_quotes(schema_pattern)) if tablename_pattern is not None and tablename_pattern != "": filter_clause += " AND table_name LIKE {table}".format(table=self.__escape_quotes(tablename_pattern)) if columnname_pattern is not None and columnname_pattern != "": filter_clause += " AND COLUMN_NAME LIKE {column}".format(column=self.__escape_quotes(columnname_pattern)) return filter_clause def __build_universal_schema_columns_query( self: "Cursor", catalog: typing.Optional[str], schema_pattern: typing.Optional[str], tablename_pattern: typing.Optional[str], columnname_pattern: typing.Optional[str], ) -> str: unknown_column_size: str = "2147483647" sql: str = ( "SELECT current_database()::varchar(128) AS TABLE_CAT," " table_schema AS TABLE_SCHEM," " table_name," " COLUMN_NAME," " CAST(CASE regexp_replace(data_type, '^_.+', 'ARRAY')" " WHEN 'text' THEN 12" " WHEN 'bit' THEN -7" " WHEN 'bool' THEN -7" " WHEN 'boolean' THEN -7" " WHEN 'varchar' THEN 12" " WHEN 'character varying' THEN 12" " WHEN 'char' THEN 1" " WHEN 'character' THEN 1" " WHEN 'nchar' THEN 1" " WHEN 'bpchar' THEN 1" " WHEN 'nvarchar' THEN 12" " WHEN '\"char\"' THEN 1" " WHEN 'date' THEN 91" " WHEN 'timestamp' THEN 93" " WHEN 'timestamp without time zone' THEN 93" " WHEN 'timestamp with time zone' THEN 2014" " WHEN 'smallint' THEN 5" " WHEN 'int2' THEN 5" " WHEN 'integer' THEN 4" " WHEN 'int' THEN 4" " WHEN 'int4' THEN 4" " WHEN 'bigint' THEN -5" " WHEN 'int8' THEN -5" " WHEN 'decimal' THEN 3" " WHEN 'real' THEN 7" " WHEN 'float4' THEN 7" " WHEN 'double precision' THEN 8" " WHEN 'float8' THEN 8" " WHEN 'float' THEN 6" " WHEN 'numeric' THEN 2" " WHEN 'timestamptz' THEN 2014" " WHEN 'bytea' THEN -2" " WHEN 'oid' THEN -5" " WHEN 'name' THEN 12" " WHEN 'ARRAY' THEN 2003" " WHEN 'geometry' THEN -4 " " WHEN 'omni' THEN -16 " " ELSE 1111 END AS SMALLINT) AS DATA_TYPE," " COALESCE(" " domain_name," " CASE data_type" " WHEN 'boolean' THEN 'bool'" " WHEN 'character varying' THEN 'varchar'" " WHEN '\"char\"' THEN 'char'" " WHEN 'smallint' THEN 'int2'" " WHEN 'integer' THEN 'int4'" " WHEN 'bigint' THEN 'int8'" " WHEN 'real' THEN 'float4'" " WHEN 'double precision' THEN 'float8'" " WHEN 'timestamp without time zone' THEN 'timestamp'" " WHEN 'timestamp with time zone' THEN 'timestamptz'" " ELSE data_type" " END) AS TYPE_NAME," " CASE data_type" " WHEN 'int4' THEN 10" " WHEN 'bit' THEN 1" " WHEN 'bool' THEN 1" " WHEN 'boolean' THEN 1" " WHEN 'varchar' THEN character_maximum_length" " WHEN 'character varying' THEN character_maximum_length" " WHEN 'char' THEN character_maximum_length" " WHEN 'character' THEN character_maximum_length" " WHEN 'nchar' THEN character_maximum_length" " WHEN 'bpchar' THEN character_maximum_length" " WHEN 'nvarchar' THEN character_maximum_length" " WHEN 'date' THEN 13" " WHEN 'timestamp' THEN 29" " WHEN 'timestamp without time zone' THEN 29" " WHEN 'smallint' THEN 5" " WHEN 'int2' THEN 5" " WHEN 'integer' THEN 10" " WHEN 'int' THEN 10" " WHEN 'int4' THEN 10" " WHEN 'bigint' THEN 19" " WHEN 'int8' THEN 19" " WHEN 'decimal' THEN numeric_precision" " WHEN 'real' THEN 8" " WHEN 'float4' THEN 8" " WHEN 'double precision' THEN 17" " WHEN 'float8' THEN 17" " WHEN 'float' THEN 17" " WHEN 'numeric' THEN numeric_precision" " WHEN '_float4' THEN 8" " WHEN 'timestamptz' THEN 35" " WHEN 'timestamp with time zone' THEN 35" " WHEN 'oid' THEN 10" " WHEN '_int4' THEN 10" " WHEN '_int2' THEN 5" " WHEN 'geometry' THEN NULL" " WHEN 'omni' THEN NULL" " ELSE {unknown_column_size}" " END AS COLUMN_SIZE," " NULL AS BUFFER_LENGTH," " CASE data_type" " WHEN 'real' THEN 8" " WHEN 'float4' THEN 8" " WHEN 'double precision' THEN 17" " WHEN 'float8' THEN 17" " WHEN 'numeric' THEN numeric_scale" " WHEN 'timestamp' THEN 6" " WHEN 'timestamp without time zone' THEN 6" " WHEN 'geometry' THEN NULL" " WHEN 'omni' THEN NULL" " ELSE 0" " END AS DECIMAL_DIGITS," " 10 AS NUM_PREC_RADIX," " CASE is_nullable WHEN 'YES' THEN 1" " WHEN 'NO' THEN 0" " ELSE 2 end AS NULLABLE," " REMARKS," " column_default AS COLUMN_DEF," " CAST(CASE regexp_replace(data_type, '^_.+', 'ARRAY')" " WHEN 'text' THEN 12" " WHEN 'bit' THEN -7" " WHEN 'bool' THEN -7" " WHEN 'boolean' THEN -7" " WHEN 'varchar' THEN 12" " WHEN 'character varying' THEN 12" " WHEN 'char' THEN 1" " WHEN 'character' THEN 1" " WHEN 'nchar' THEN 1" " WHEN 'bpchar' THEN 1" " WHEN 'nvarchar' THEN 12" " WHEN '\"char\"' THEN 1" " WHEN 'date' THEN 91" " WHEN 'timestamp' THEN 93" " WHEN 'timestamp without time zone' THEN 93" " WHEN 'timestamp with time zone' THEN 2014" " WHEN 'smallint' THEN 5" " WHEN 'int2' THEN 5" " WHEN 'integer' THEN 4" " WHEN 'int' THEN 4" " WHEN 'int4' THEN 4" " WHEN 'bigint' THEN -5" " WHEN 'int8' THEN -5" " WHEN 'decimal' THEN 3" " WHEN 'real' THEN 7" " WHEN 'float4' THEN 7" " WHEN 'double precision' THEN 8" " WHEN 'float8' THEN 8" " WHEN 'float' THEN 6" " WHEN 'numeric' THEN 2" " WHEN 'timestamptz' THEN 2014" " WHEN 'bytea' THEN -2" " WHEN 'oid' THEN -5" " WHEN 'name' THEN 12" " WHEN 'ARRAY' THEN 2003" " WHEN 'geometry' THEN -4" " WHEN 'omni' THEN -16" " ELSE 1111 END AS SMALLINT) AS SQL_DATA_TYPE," " CAST(NULL AS SMALLINT) AS SQL_DATETIME_SUB," " CASE data_type" " WHEN 'int4' THEN 10" " WHEN 'bit' THEN 1" " WHEN 'bool' THEN 1" " WHEN 'boolean' THEN 1" " WHEN 'varchar' THEN character_maximum_length" " WHEN 'character varying' THEN character_maximum_length" " WHEN 'char' THEN character_maximum_length" " WHEN 'character' THEN character_maximum_length" " WHEN 'nchar' THEN character_maximum_length" " WHEN 'bpchar' THEN character_maximum_length" " WHEN 'nvarchar' THEN character_maximum_length" " WHEN 'date' THEN 13" " WHEN 'timestamp' THEN 29" " WHEN 'timestamp without time zone' THEN 29" " WHEN 'smallint' THEN 5" " WHEN 'int2' THEN 5" " WHEN 'integer' THEN 10" " WHEN 'int' THEN 10" " WHEN 'int4' THEN 10" " WHEN 'bigint' THEN 19" " WHEN 'int8' THEN 19" " WHEN 'decimal' THEN numeric_precision" " WHEN 'real' THEN 8" " WHEN 'float4' THEN 8" " WHEN 'double precision' THEN 17" " WHEN 'float8' THEN 17" " WHEN 'float' THEN 17" " WHEN 'numeric' THEN numeric_precision" " WHEN '_float4' THEN 8" " WHEN 'timestamptz' THEN 35" " WHEN 'timestamp with time zone' THEN 35" " WHEN 'oid' THEN 10" " WHEN '_int4' THEN 10" " WHEN '_int2' THEN 5" " WHEN 'geometry' THEN NULL" " WHEN 'omni' THEN NULL" " ELSE {unknown_column_size}" " END AS CHAR_OCTET_LENGTH," " ordinal_position AS ORDINAL_POSITION," " is_nullable AS IS_NULLABLE," " NULL AS SCOPE_CATALOG," " NULL AS SCOPE_SCHEMA," " NULL AS SCOPE_TABLE," " CASE" " WHEN domain_name is not null THEN data_type" " END AS SOURCE_DATA_TYPE," " CASE WHEN left(column_default, 10) = '\"identity\"' THEN 'YES'" " WHEN left(column_default, 16) = 'default_identity' THEN 'YES' " " ELSE 'NO' END AS IS_AUTOINCREMENT," " IS_AUTOINCREMENT AS IS_GENERATEDCOLUMN" " FROM svv_columns" " WHERE true " ).format(unknown_column_size=unknown_column_size) sql += self._get_catalog_filter_conditions(catalog, True, None) sql += self.__build_universal_schema_columns_query_filters( schema_pattern, tablename_pattern, columnname_pattern ) sql += " ORDER BY table_schem,table_name,ORDINAL_POSITION " return sql def __build_universal_all_schema_columns_query( self: "Cursor", catalog: typing.Optional[str], schema_pattern: typing.Optional[str], tablename_pattern: typing.Optional[str], columnname_pattern: typing.Optional[str], ) -> str: unknown_column_size: str = "2147483647" sql: str = ( "SELECT database_name AS TABLE_CAT, " " schema_name AS TABLE_SCHEM, " " table_name, " " COLUMN_NAME, " " CAST(CASE regexp_replace(data_type, '^_.', 'ARRAY') " " WHEN 'text' THEN 12 " " WHEN 'bit' THEN -7 " " WHEN 'bool' THEN -7 " " WHEN 'boolean' THEN -7 " " WHEN 'varchar' THEN 12 " " WHEN 'character varying' THEN 12 " " WHEN 'char' THEN 1 " " WHEN 'character' THEN 1 " " WHEN 'nchar' THEN 1 " " WHEN 'bpchar' THEN 1 " " WHEN 'nvarchar' THEN 12 " " WHEN '\"char\"' THEN 1 " " WHEN 'date' THEN 91 " " WHEN 'timestamp' THEN 93 " " WHEN 'timestamp without time zone' THEN 93 " " WHEN 'timestamp with time zone' THEN 2014 " " WHEN 'smallint' THEN 5 " " WHEN 'int2' THEN 5 " " WHEN 'integer' THEN 4 " " WHEN 'int' THEN 4 " " WHEN 'int4' THEN 4 " " WHEN 'bigint' THEN -5 " " WHEN 'int8' THEN -5 " " WHEN 'decimal' THEN 3 " " WHEN 'real' THEN 7 " " WHEN 'float4' THEN 7 " " WHEN 'double precision' THEN 8 " " WHEN 'float8' THEN 8 " " WHEN 'float' THEN 6 " " WHEN 'numeric' THEN 2 " " WHEN 'timestamptz' THEN 2014 " " WHEN 'bytea' THEN -2 " " WHEN 'oid' THEN -5 " " WHEN 'name' THEN 12 " " WHEN 'ARRAY' THEN 2003 " " WHEN 'geometry' THEN -4 " " WHEN 'super' THEN -16 " " ELSE 1111 END AS SMALLINT) AS DATA_TYPE, " " CASE data_type " " WHEN 'boolean' THEN 'bool' " " WHEN 'character varying' THEN 'varchar' " " WHEN '\"char\"' THEN 'char' " " WHEN 'smallint' THEN 'int2' " " WHEN 'integer' THEN 'int4' " " WHEN 'bigint' THEN 'int8' " " WHEN 'real' THEN 'float4' " " WHEN 'double precision' THEN 'float8' " " WHEN 'timestamp without time zone' THEN 'timestamp' " " WHEN 'timestamp with time zone' THEN 'timestamptz' " " ELSE data_type " " END AS TYPE_NAME, " " CASE data_type " " WHEN 'int4' THEN 10 " " WHEN 'bit' THEN 1 " " WHEN 'bool' THEN 1 " " WHEN 'boolean' THEN 1 " " WHEN 'varchar' THEN character_maximum_length " " WHEN 'character varying' THEN character_maximum_length " " WHEN 'char' THEN character_maximum_length " " WHEN 'character' THEN character_maximum_length " " WHEN 'nchar' THEN character_maximum_length " " WHEN 'bpchar' THEN character_maximum_length " " WHEN 'nvarchar' THEN character_maximum_length " " WHEN 'date' THEN 13 " " WHEN 'timestamp' THEN 29 " " WHEN 'timestamp without time zone' THEN 29 " " WHEN 'smallint' THEN 5 " " WHEN 'int2' THEN 5 " " WHEN 'integer' THEN 10 " " WHEN 'int' THEN 10 " " WHEN 'int4' THEN 10 " " WHEN 'bigint' THEN 19 " " WHEN 'int8' THEN 19 " " WHEN 'decimal' THEN numeric_precision " " WHEN 'real' THEN 8 " " WHEN 'float4' THEN 8 " " WHEN 'double precision' THEN 17 " " WHEN 'float8' THEN 17 " " WHEN 'float' THEN 17 " " WHEN 'numeric' THEN numeric_precision " " WHEN '_float4' THEN 8 " " WHEN 'timestamptz' THEN 35 " " WHEN 'timestamp with time zone' THEN 35 " " WHEN 'oid' THEN 10 " " WHEN '_int4' THEN 10 " " WHEN '_int2' THEN 5 " " WHEN 'geometry' THEN NULL " " WHEN 'super' THEN NULL " " ELSE 2147483647 " " END AS COLUMN_SIZE, " " NULL AS BUFFER_LENGTH, " " CASE data_type " " WHEN 'real' THEN 8 " " WHEN 'float4' THEN 8 " " WHEN 'double precision' THEN 17 " " WHEN 'float8' THEN 17 " " WHEN 'numeric' THEN numeric_scale " " WHEN 'timestamp' THEN 6 " " WHEN 'timestamp without time zone' THEN 6 " " WHEN 'geometry' THEN NULL " " WHEN 'super' THEN NULL " " ELSE 0 " " END AS DECIMAL_DIGITS, " " 10 AS NUM_PREC_RADIX, " " CASE is_nullable WHEN 'YES' THEN 1 " " WHEN 'NO' THEN 0 " " ELSE 2 end AS NULLABLE, " " REMARKS, " " column_default AS COLUMN_DEF, " " CAST(CASE regexp_replace(data_type, '^_.', 'ARRAY') " " WHEN 'text' THEN 12 " " WHEN 'bit' THEN -7 " " WHEN 'bool' THEN -7 " " WHEN 'boolean' THEN -7 " " WHEN 'varchar' THEN 12 " " WHEN 'character varying' THEN 12 " " WHEN 'char' THEN 1 " " WHEN 'character' THEN 1 " " WHEN 'nchar' THEN 1 " " WHEN 'bpchar' THEN 1 " " WHEN 'nvarchar' THEN 12 " " WHEN '\"char\"' THEN 1 " " WHEN 'date' THEN 91 " " WHEN 'timestamp' THEN 93 " " WHEN 'timestamp without time zone' THEN 93 " " WHEN 'timestamp with time zone' THEN 2014 " " WHEN 'smallint' THEN 5 " " WHEN 'int2' THEN 5 " " WHEN 'integer' THEN 4 " " WHEN 'int' THEN 4 " " WHEN 'int4' THEN 4 " " WHEN 'bigint' THEN -5 " " WHEN 'int8' THEN -5 " " WHEN 'decimal' THEN 3 " " WHEN 'real' THEN 7 " " WHEN 'float4' THEN 7 " " WHEN 'double precision' THEN 8 " " WHEN 'float8' THEN 8 " " WHEN 'float' THEN 6 " " WHEN 'numeric' THEN 2 " " WHEN 'timestamptz' THEN 2014 " " WHEN 'bytea' THEN -2 " " WHEN 'oid' THEN -5 " " WHEN 'name' THEN 12 " " WHEN 'ARRAY' THEN 2003 " " WHEN 'geometry' THEN -4 " " WHEN 'super' THEN -16 " " ELSE 1111 END AS SMALLINT) AS SQL_DATA_TYPE, " " CAST(NULL AS SMALLINT) AS SQL_DATETIME_SUB, " " CASE data_type " " WHEN 'int4' THEN 10 " " WHEN 'bit' THEN 1 " " WHEN 'bool' THEN 1 " " WHEN 'boolean' THEN 1 " " WHEN 'varchar' THEN character_maximum_length " " WHEN 'character varying' THEN character_maximum_length " " WHEN 'char' THEN character_maximum_length " " WHEN 'character' THEN character_maximum_length " " WHEN 'nchar' THEN character_maximum_length " " WHEN 'bpchar' THEN character_maximum_length " " WHEN 'nvarchar' THEN character_maximum_length " " WHEN 'date' THEN 13 " " WHEN 'timestamp' THEN 29 " " WHEN 'timestamp without time zone' THEN 29 " " WHEN 'smallint' THEN 5 " " WHEN 'int2' THEN 5 " " WHEN 'integer' THEN 10 " " WHEN 'int' THEN 10 " " WHEN 'int4' THEN 10 " " WHEN 'bigint' THEN 19 " " WHEN 'int8' THEN 19 " " WHEN 'decimal' THEN numeric_precision " " WHEN 'real' THEN 8 " " WHEN 'float4' THEN 8 " " WHEN 'double precision' THEN 17 " " WHEN 'float8' THEN 17 " " WHEN 'float' THEN 17 " " WHEN 'numeric' THEN numeric_precision " " WHEN '_float4' THEN 8 " " WHEN 'timestamptz' THEN 35 " " WHEN 'timestamp with time zone' THEN 35 " " WHEN 'oid' THEN 10 " " WHEN '_int4' THEN 10 " " WHEN '_int2' THEN 5 " " WHEN 'geometry' THEN NULL " " WHEN 'super' THEN NULL " " ELSE 2147483647 " " END AS CHAR_OCTET_LENGTH, " " ordinal_position AS ORDINAL_POSITION, " " is_nullable AS IS_NULLABLE, " " NULL AS SCOPE_CATALOG, " " NULL AS SCOPE_SCHEMA, " " NULL AS SCOPE_TABLE, " " data_type as SOURCE_DATA_TYPE, " " CASE WHEN left(column_default, 10) = '\"identity\"' THEN 'YES' " " WHEN left(column_default, 16) = 'default_identity' THEN 'YES' " " ELSE 'NO' END AS IS_AUTOINCREMENT, " " IS_AUTOINCREMENT AS IS_GENERATEDCOLUMN " " FROM PG_CATALOG.svv_all_columns " " WHERE true " ) sql += self._get_catalog_filter_conditions(catalog, False, None) sql += self.__build_universal_schema_columns_query_filters( schema_pattern, tablename_pattern, columnname_pattern ) sql += " ORDER BY TABLE_CAT, TABLE_SCHEM, TABLE_NAME, ORDINAL_POSITION " return sql def __build_external_schema_columns_query( self: "Cursor", catalog: typing.Optional[str], schema_pattern: typing.Optional[str], tablename_pattern: typing.Optional[str], columnname_pattern: typing.Optional[str], ) -> str: sql: str = ( "SELECT current_database()::varchar(128) AS TABLE_CAT," " schemaname AS TABLE_SCHEM," " tablename AS TABLE_NAME," " columnname AS COLUMN_NAME," " CAST(CASE WHEN external_type = 'text' THEN 12" " WHEN external_type = 'bit' THEN -7" " WHEN external_type = 'bool' THEN -7" " WHEN external_type = 'boolean' THEN -7" " WHEN left(external_type, 7) = 'varchar' THEN 12" " WHEN left(external_type, 17) = 'character varying' THEN 12" " WHEN left(external_type, 4) = 'char' THEN 1" " WHEN left(external_type, 9) = 'character' THEN 1" " WHEN left(external_type, 5) = 'nchar' THEN 1" " WHEN left(external_type, 6) = 'bpchar' THEN 1" " WHEN left(external_type, 8) = 'nvarchar' THEN 12" " WHEN external_type = '\"char\"' THEN 1" " WHEN external_type = 'date' THEN 91" " WHEN external_type = 'timestamp' THEN 93" " WHEN external_type = 'timestamp without time zone' THEN 93" " WHEN external_type = 'timestamp with time zone' THEN 2014" " WHEN external_type = 'smallint' THEN 5" " WHEN external_type = 'int2' THEN 5" " WHEN external_type = '_int2' THEN 5" " WHEN external_type = 'integer' THEN 4" " WHEN external_type = 'int' THEN 4" " WHEN external_type = 'int4' THEN 4" " WHEN external_type = '_int4' THEN 4" " WHEN external_type = 'bigint' THEN -5" " WHEN external_type = 'int8' THEN -5" " WHEN left(external_type, 7) = 'decimal' THEN 2" " WHEN external_type = 'real' THEN 7" " WHEN external_type = 'float4' THEN 7" " WHEN external_type = '_float4' THEN 7" " WHEN external_type = 'double' THEN 8" " WHEN external_type = 'double precision' THEN 8" " WHEN external_type = 'float8' THEN 8" " WHEN external_type = '_float8' THEN 8" " WHEN external_type = 'float' THEN 6" " WHEN left(external_type, 7) = 'numeric' THEN 2" " WHEN external_type = 'timestamptz' THEN 2014" " WHEN external_type = 'bytea' THEN -2" " WHEN external_type = 'oid' THEN -5" " WHEN external_type = 'name' THEN 12" " WHEN external_type = 'ARRAY' THEN 2003" " WHEN external_type = 'geometry' THEN -4" " WHEN external_type = 'omni' THEN -16" " ELSE 1111 END AS SMALLINT) AS DATA_TYPE," " CASE WHEN left(external_type, 17) = 'character varying' THEN 'varchar'" " WHEN left(external_type, 7) = 'varchar' THEN 'varchar'" " WHEN left(external_type, 4) = 'char' THEN 'char'" " WHEN left(external_type, 7) = 'decimal' THEN 'numeric'" " WHEN left(external_type, 7) = 'numeric' THEN 'numeric'" " WHEN external_type = 'double' THEN 'double precision'" " WHEN external_type = 'timestamp without time zone' THEN 'timestamp'" " WHEN external_type = 'timestamp with time zone' THEN 'timestamptz'" " ELSE external_type END AS TYPE_NAME," " CASE WHEN external_type = 'int4' THEN 10" " WHEN external_type = 'bit' THEN 1" " WHEN external_type = 'bool' THEN 1" " WHEN external_type = 'boolean' THEN 1" " WHEN left(external_type, 7) = 'varchar' THEN regexp_substr(external_type, '[0-9]+', 7)::integer" " WHEN left(external_type, 17) = 'character varying' THEN regexp_substr(external_type, '[0-9]+', 17)::integer" " WHEN left(external_type, 4) = 'char' THEN regexp_substr(external_type, '[0-9]+', 4)::integer" " WHEN left(external_type, 9) = 'character' THEN regexp_substr(external_type, '[0-9]+', 9)::integer" " WHEN left(external_type, 5) = 'nchar' THEN regexp_substr(external_type, '[0-9]+', 5)::integer" " WHEN left(external_type, 6) = 'bpchar' THEN regexp_substr(external_type, '[0-9]+', 6)::integer" " WHEN left(external_type, 8) = 'nvarchar' THEN regexp_substr(external_type, '[0-9]+', 8)::integer" " WHEN external_type = 'date' THEN 13 WHEN external_type = 'timestamp' THEN 29" " WHEN external_type = 'timestamp without time zone' THEN 29" " WHEN external_type = 'smallint' THEN 5" " WHEN external_type = 'int2' THEN 5" " WHEN external_type = 'integer' THEN 10" " WHEN external_type = 'int' THEN 10" " WHEN external_type = 'int4' THEN 10" " WHEN external_type = 'bigint' THEN 19" " WHEN external_type = 'int8' THEN 19" " WHEN left(external_type, 7) = 'decimal' THEN regexp_substr(external_type, '[0-9]+', 7)::integer" " WHEN external_type = 'real' THEN 8" " WHEN external_type = 'float4' THEN 8" " WHEN external_type = '_float4' THEN 8" " WHEN external_type = 'double' THEN 17" " WHEN external_type = 'double precision' THEN 17" " WHEN external_type = 'float8' THEN 17" " WHEN external_type = '_float8' THEN 17" " WHEN external_type = 'float' THEN 17" " WHEN left(external_type, 7) = 'numeric' THEN regexp_substr(external_type, '[0-9]+', 7)::integer" " WHEN external_type = '_float4' THEN 8" " WHEN external_type = 'timestamptz' THEN 35" " WHEN external_type = 'timestamp with time zone' THEN 35" " WHEN external_type = 'oid' THEN 10" " WHEN external_type = '_int4' THEN 10" " WHEN external_type = '_int2' THEN 5" " WHEN external_type = 'geometry' THEN NULL" " WHEN external_type = 'omni' THEN NULL" " ELSE 2147483647 END AS COLUMN_SIZE," " NULL AS BUFFER_LENGTH," " CASE WHEN external_type = 'real'THEN 8" " WHEN external_type = 'float4' THEN 8" " WHEN external_type = 'double' THEN 17" " WHEN external_type = 'double precision' THEN 17" " WHEN external_type = 'float8' THEN 17" " WHEN left(external_type, 7) = 'numeric' THEN regexp_substr(external_type, '[0-9]+', 10)::integer" " WHEN left(external_type, 7) = 'decimal' THEN regexp_substr(external_type, '[0-9]+', 10)::integer" " WHEN external_type = 'timestamp' THEN 6" " WHEN external_type = 'timestamp without time zone' THEN 6" " WHEN external_type = 'geometry' THEN NULL" " WHEN external_type = 'omni' THEN NULL" " ELSE 0 END AS DECIMAL_DIGITS," " 10 AS NUM_PREC_RADIX," " NULL AS NULLABLE," " NULL AS REMARKS," " NULL AS COLUMN_DEF," " CAST(CASE WHEN external_type = 'text' THEN 12" " WHEN external_type = 'bit' THEN -7" " WHEN external_type = 'bool' THEN -7" " WHEN external_type = 'boolean' THEN -7" " WHEN left(external_type, 7) = 'varchar' THEN 12" " WHEN left(external_type, 17) = 'character varying' THEN 12" " WHEN left(external_type, 4) = 'char' THEN 1" " WHEN left(external_type, 9) = 'character' THEN 1" " WHEN left(external_type, 5) = 'nchar' THEN 1" " WHEN left(external_type, 6) = 'bpchar' THEN 1" " WHEN left(external_type, 8) = 'nvarchar' THEN 12" " WHEN external_type = '\"char\"' THEN 1" " WHEN external_type = 'date' THEN 91" " WHEN external_type = 'timestamp' THEN 93" " WHEN external_type = 'timestamp without time zone' THEN 93" " WHEN external_type = 'timestamp with time zone' THEN 2014" " WHEN external_type = 'smallint' THEN 5" " WHEN external_type = 'int2' THEN 5" " WHEN external_type = '_int2' THEN 5" " WHEN external_type = 'integer' THEN 4" " WHEN external_type = 'int' THEN 4" " WHEN external_type = 'int4' THEN 4" " WHEN external_type = '_int4' THEN 4" " WHEN external_type = 'bigint' THEN -5" " WHEN external_type = 'int8' THEN -5" " WHEN left(external_type, 7) = 'decimal' THEN 3" " WHEN external_type = 'real' THEN 7" " WHEN external_type = 'float4' THEN 7" " WHEN external_type = '_float4' THEN 7" " WHEN external_type = 'double' THEN 8" " WHEN external_type = 'double precision' THEN 8" " WHEN external_type = 'float8' THEN 8" " WHEN external_type = '_float8' THEN 8" " WHEN external_type = 'float' THEN 6" " WHEN left(external_type, 7) = 'numeric' THEN 2" " WHEN external_type = 'timestamptz' THEN 2014" " WHEN external_type = 'bytea' THEN -2" " WHEN external_type = 'oid' THEN -5" " WHEN external_type = 'name' THEN 12" " WHEN external_type = 'ARRAY' THEN 2003" " WHEN external_type = 'geometry' THEN -4" " WHEN external_type = 'omni' THEN -16" " ELSE 1111 END AS SMALLINT) AS SQL_DATA_TYPE," " CAST(NULL AS SMALLINT) AS SQL_DATETIME_SUB," " CASE WHEN left(external_type, 7) = 'varchar' THEN regexp_substr(external_type, '[0-9]+', 7)::integer" " WHEN left(external_type, 17) = 'character varying' THEN regexp_substr(external_type, '[0-9]+', 17)::integer" " WHEN left(external_type, 4) = 'char' THEN regexp_substr(external_type, '[0-9]+', 4)::integer" " WHEN left(external_type, 9) = 'character' THEN regexp_substr(external_type, '[0-9]+', 9)::integer" " WHEN left(external_type, 5) = 'nchar' THEN regexp_substr(external_type, '[0-9]+', 5)::integer" " WHEN left(external_type, 6) = 'bpchar' THEN regexp_substr(external_type, '[0-9]+', 6)::integer" " WHEN left(external_type, 8) = 'nvarchar' THEN regexp_substr(external_type, '[0-9]+', 8)::integer" " WHEN external_type = 'string' THEN 16383" " ELSE NULL END AS CHAR_OCTET_LENGTH," " columnnum AS ORDINAL_POSITION," " NULL AS IS_NULLABLE," " NULL AS SCOPE_CATALOG," " NULL AS SCOPE_SCHEMA," " NULL AS SCOPE_TABLE," " NULL AS SOURCE_DATA_TYPE," " 'NO' AS IS_AUTOINCREMENT," " 'NO' AS IS_GENERATEDCOLUMN" " FROM svv_external_columns" " WHERE true " ) sql += self._get_catalog_filter_conditions(catalog, True, None) if schema_pattern is not None and schema_pattern != "": sql += " AND schemaname LIKE {schema}".format(schema=self.__escape_quotes(schema_pattern)) if tablename_pattern is not None and tablename_pattern != "": sql += " AND tablename LIKE {table}".format(table=self.__escape_quotes(tablename_pattern)) if columnname_pattern is not None and columnname_pattern != "": sql += " AND columnname LIKE {column}".format(column=self.__escape_quotes(columnname_pattern)) sql += " ORDER BY table_schem,table_name,ORDINAL_POSITION " return sql def __schema_pattern_match(self: "Cursor", schema_pattern: typing.Optional[str]) -> str: if self._c is None: raise InterfaceError("connection is closed") if schema_pattern is not None and schema_pattern != "": if self._c.is_single_database_metadata is True: sql: str = "select 1 from svv_external_schemas where schemaname like {schema}".format( schema=self.__escape_quotes(schema_pattern) ) self.execute(sql) schemas: tuple = self.fetchall() if schemas is not None and len(schemas) > 0: return "EXTERNAL_SCHEMA_QUERY" else: return "LOCAL_SCHEMA_QUERY" else: return "NO_SCHEMA_UNIVERSAL_QUERY" else: return "NO_SCHEMA_UNIVERSAL_QUERY" def __sanitize_str(self: "Cursor", s: str) -> str: return re.sub(r"[-;/'\"\n\r ]", "", s) def __escape_quotes(self: "Cursor", s: str) -> str: return "'{s}'".format(s=self.__sanitize_str(s))