from datetime import (
date as Date, datetime as Datetime, time as Time, timedelta as Timedelta,
timezone as Timezone)
from decimal import Decimal
from enum import Enum
from ipaddress import (
IPv4Address, IPv4Network, IPv6Address, IPv6Network, ip_address, ip_network)
from json import loads
from time import localtime
from uuid import UUID
from pg8000.exceptions import InterfaceError
BIGINTEGER = 20
BOOLEAN = 16
BOOLEAN_ARRAY = 1000
BYTES = 17
CHAR = 1042
CHAR_ARRAY = 1014
DATE = 1082
DATETIME = 1114
DECIMAL = 1700
DECIMAL_ARRAY = 1231
FLOAT = 701
FLOAT_ARRAY = 1022
INET = 869
INT2VECTOR = 22
INTEGER = 23
INTEGER_ARRAY = 1016
INTERVAL = 1186
MACADDR = 829
NAME = 19
NAME_ARRAY = 1003
OID = 26
JSON = 114
JSONB = 3802
NULLTYPE = -1
NUMBER = 1700
STRING = 1043
TEXT = 25
TEXT_ARRAY = 1009
TIME = 1083
TIMEDELTA = 1186
TIMESTAMP = 1114
TIMESTAMPTZ = 1184
UNKNOWN = 705
UUID_TYPE = 2950
VARCHAR = 1043
VARCHAR_ARRAY = 1015
XID = 28
MIN_INT2, MAX_INT2 = -2 ** 15, 2 ** 15
MIN_INT4, MAX_INT4 = -2 ** 31, 2 ** 31
MIN_INT8, MAX_INT8 = -2 ** 63, 2 ** 63
def text_out(v):
return v
def enum_out(v):
return str(v.value)
def time_out(v):
return v.isoformat()
def date_out(v):
return v.isoformat()
def unknown_out(v):
return str(v)
def vector_in(data):
return eval('[' + data.replace(' ', ',') + ']')
def text_in(data):
return data
def bool_in(data):
return data == 't'
def json_in(data):
return loads(data)
def time_in(data):
if "." in data:
pattern = "%H:%M:%S.%f"
else:
pattern = "%H:%M:%S"
return Datetime.strptime(data, pattern).time()
def date_in(data):
return Datetime.strptime(data, "%Y-%m-%d").date()
def numeric_in(data):
return Decimal(data)
def numeric_out(d):
return str(d)
def inet_out(v):
return str(v)
def inet_in(data):
return ip_network(data, False) if '/' in data else ip_address(data)
def int_out(v):
return str(v)
def float_out(v):
return str(v)
def timestamp_in(data):
if data in ('infinity', '-infinity'):
return data
elif '.' in data:
pattern = "%Y-%m-%d %H:%M:%S.%f"
else:
pattern = "%Y-%m-%d %H:%M:%S"
return Datetime.strptime(data, pattern)
def timestamp_out(v):
return v.isoformat()
def timestamptz_out(v):
# timestamps should be sent as UTC. If they have zone info,
# convert them.
return v.astimezone(Timezone.utc).isoformat()
def timestamptz_in(data):
patt = "%Y-%m-%d %H:%M:%S.%f%z" if '.' in data else "%Y-%m-%d %H:%M:%S%z"
return Datetime.strptime(data + '00', patt)
def pginterval_out(v):
return str(v)
def timedelta_out(v):
return ' '.join(
(
str(v.days), "days", str(v.seconds), "seconds",
str(v.microseconds), "microseconds"
)
)
def timedelta_in(data):
t = {}
curr_val = None
for k in data.split():
if ':' in k:
t['hours'], t['minutes'], t['seconds'] = map(float, k.split(':'))
else:
try:
curr_val = float(k)
except ValueError:
t[PGInterval.UNIT_MAP[k]] = curr_val
for n in ['weeks', 'months', 'years', 'decades', 'centuries', 'millennia']:
if n in t:
raise InterfaceError(
"Can't fit the interval " + str(t) +
" into a datetime.timedelta.")
return Timedelta(**t)
def pginterval_in(data):
return PGInterval.from_str(data)
def bytes_out(v):
return '\\x' + v.hex()
def bytea_in(data):
return bytes.fromhex(data[2:])
def uuid_out(v):
return str(v)
def uuid_in(data):
return UUID(data)
def bool_out(v):
return 'true' if v else 'false'
def null_out(v):
return None
def int_in(data):
return int(data)
BINARY = bytes
def PgDate(year, month, day):
"""Constuct an object holding a date value.
This function is part of the `DBAPI 2.0 specification
`_.
:rtype: :class:`datetime.date`
"""
return Date(year, month, day)
def PgTime(hour, minute, second):
"""Construct an object holding a time value.
This function is part of the `DBAPI 2.0 specification
`_.
:rtype: :class:`datetime.time`
"""
return Time(hour, minute, second)
def Timestamp(year, month, day, hour, minute, second):
"""Construct an object holding a timestamp value.
This function is part of the `DBAPI 2.0 specification
`_.
:rtype: :class:`datetime.datetime`
"""
return Datetime(year, month, day, hour, minute, second)
def DateFromTicks(ticks):
"""Construct an object holding a date value from the given ticks value
(number of seconds since the epoch).
This function is part of the `DBAPI 2.0 specification
`_.
:rtype: :class:`datetime.date`
"""
return Date(*localtime(ticks)[:3])
def TimeFromTicks(ticks):
"""Construct an objet holding a time value from the given ticks value
(number of seconds since the epoch).
This function is part of the `DBAPI 2.0 specification
`_.
:rtype: :class:`datetime.time`
"""
return Time(*localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
"""Construct an object holding a timestamp value from the given ticks value
(number of seconds since the epoch).
This function is part of the `DBAPI 2.0 specification
`_.
:rtype: :class:`datetime.datetime`
"""
return Timestamp(*localtime(ticks)[:6])
def Binary(value):
"""Construct an object holding binary data.
This function is part of the `DBAPI 2.0 specification
`_.
"""
return value
class PGInterval():
UNIT_MAP = {
'year': 'years',
'years': 'years',
'millennia': 'millennia',
'millenium': 'millennia',
'centuries': 'centuries',
'century': 'centuries',
'decades': 'decades',
'decade': 'decades',
'years': 'years',
'year': 'years',
'months': 'months',
'month': 'months',
'mon': 'months',
'mons': 'months',
'weeks': 'weeks',
'week': 'weeks',
'days': 'days',
'day': 'days',
'hours': 'hours',
'hour': 'hours',
'minutes': 'minutes',
'minute': 'minutes',
'seconds': 'seconds',
'second': 'seconds',
'microseconds': 'microseconds',
'microsecond': 'microseconds'
}
@staticmethod
def from_str(interval_str):
t = {}
curr_val = None
for k in interval_str.split():
if ':' in k:
hours_str, minutes_str, seconds_str = k.split(':')
hours = int(hours_str)
if hours != 0:
t['hours'] = hours
minutes = int(minutes_str)
if minutes != 0:
t['minutes'] = minutes
try:
seconds = int(seconds_str)
except ValueError:
seconds = float(seconds_str)
if seconds != 0:
t['seconds'] = seconds
else:
try:
curr_val = int(k)
except ValueError:
t[PGInterval.UNIT_MAP[k]] = curr_val
return PGInterval(**t)
def __init__(
self, millennia=None, centuries=None, decades=None, years=None,
months=None, weeks=None, days=None, hours=None, minutes=None,
seconds=None, microseconds=None):
self.millennia = millennia
self.centuries = centuries
self.decades = decades
self.years = years
self.months = months
self.weeks = weeks
self.days = days
self.hours = hours
self.minutes = minutes
self.seconds = seconds
self.microseconds = microseconds
def __repr__(self):
res = ["'
def __str__(self):
res = []
if self.millennia is not None:
res.append(str(self.millenia))
res.append('millenia')
if self.centuries is not None:
res.append(str(self.centuries))
res.append('centuries')
if self.decades is not None:
res.append(str(self.decades))
res.append('decades')
if self.years is not None:
res.append(str(self.years))
res.append('years')
if self.months is not None:
res.append(str(self.months))
res.append('months')
if self.weeks is not None:
res.append(str(self.weeks))
res.append('weeks')
if self.days is not None:
res.append(str(self.days))
res.append('days')
if self.hours is not None:
res.append(str(self.hours))
res.append('hours')
if self.minutes is not None:
res.append(str(self.minutes))
res.append('minutes')
if self.seconds is not None:
res.append(str(self.seconds))
res.append('seconds')
if self.microseconds is not None:
res.append(str(self.microseconds))
res.append('microseconds')
return ' '.join(res)
def normalize(self):
months = 0
if self.months is not None:
months += self.months
if self.years is not None:
months += self.years * 12
days = 0
if self.days is not None:
days += self.days
if self.weeks is not None:
days += self.weeks * 7
seconds = 0
if self.hours is not None:
seconds += self.hours * 60 * 60
if self.minutes is not None:
seconds += self.minutes * 60
if self.seconds is not None:
seconds += self.seconds
if self.microseconds is not None:
seconds += self.microseconds / 1000000
return PGInterval(months=months, days=days, seconds=seconds)
def __eq__(self, other):
if isinstance(other, PGInterval):
s = self.normalize()
o = other.normalize()
return s.months == o.months and s.days == o.days and \
s.seconds == o.seconds
else:
return False
class ArrayState(Enum):
InString = 1
InEscape = 2
InValue = 3
Out = 4
def _parse_array(data, adapter):
state = ArrayState.Out
stack = [[]]
val = []
for c in data:
if state == ArrayState.InValue:
if c in ('}', ','):
value = ''.join(val)
stack[-1].append(None if value == 'NULL' else adapter(value))
state = ArrayState.Out
else:
val.append(c)
if state == ArrayState.Out:
if c == '{':
a = []
stack[-1].append(a)
stack.append(a)
elif c == '}':
stack.pop()
elif c == ',':
pass
elif c == '"':
val = []
state = ArrayState.InString
else:
val = [c]
state = ArrayState.InValue
elif state == ArrayState.InString:
if c == '"':
stack[-1].append(adapter(''.join(val)))
state = ArrayState.Out
elif c == '\\':
state = ArrayState.InEscape
else:
val.append(c)
elif state == ArrayState.InEscape:
val.append(c)
state = ArrayState.InString
return stack[0][0]
def _array_in(adapter):
def f(data):
return _parse_array(data, adapter)
return f
array_bool_in = _array_in(bool_in)
array_int_in = _array_in(int)
array_float_in = _array_in(float)
array_numeric_in = _array_in(numeric_in)
array_text_in = _array_in(text_in)
def array_string_escape(v):
cs = []
for c in v:
if c == '\\':
cs.append('\\')
elif c == '"':
cs.append('\\')
cs.append(c)
val = ''.join(cs)
if len(val) == 0 or val == 'NULL' or any(
[c in val for c in ('{', '}', ",", " ", '\\')]):
val = '"' + val + '"'
return val
PY_TYPES = {
type(None): (NULLTYPE, null_out), # null
bool: (16, bool_out),
bytearray: (17, bytes_out), # bytea
20: (20, int_out), # int8
21: (21, int_out), # int2
23: (23, int_out), # int4
float: (701, float_out), # float8
Date: (1082, date_out), # date
Time: (1083, time_out), # time
1114: (1114, timestamp_out), # timestamp
1184: (1184, timestamptz_out), # timestamptz
Timedelta: (1186, timedelta_out),
PGInterval: (1186, pginterval_out),
Decimal: (1700, numeric_out), # Decimal
UUID: (2950, uuid_out), # uuid
bytes: (17, bytes_out), # bytea
str: (UNKNOWN, text_out), # unknown
Enum: (UNKNOWN, enum_out),
IPv4Address: (869, inet_out), # inet
IPv6Address: (869, inet_out), # inet
IPv4Network: (869, inet_out), # inet
IPv6Network: (869, inet_out), # inet
}
PG_TYPES = {
16: bool_in, # boolean
17: bytea_in, # bytea
19: text_in, # name type
20: int, # int8
21: int, # int2
22: vector_in, # int2vector
23: int, # int4
25: text_in, # TEXT type
26: int, # oid
28: int, # xid
114: json_in, # json
700: float, # float4
701: float, # float8
UNKNOWN: text_in, # unknown
829: text_in, # MACADDR type
869: inet_in, # inet
1000: array_bool_in, # BOOL[]
1003: array_text_in, # NAME[]
1005: array_int_in, # INT2[]
1007: array_int_in, # INT4[]
1009: array_text_in, # TEXT[]
1014: array_text_in, # CHAR[]
1015: array_text_in, # VARCHAR[]
1016: array_int_in, # INT8[]
1021: array_float_in, # FLOAT4[]
1022: array_float_in, # FLOAT8[]
1042: text_in, # CHAR type
1043: text_in, # VARCHAR type
1082: date_in, # date
1083: time_in,
1114: timestamp_in,
1184: timestamptz_in, # timestamp w/ tz
1186: timedelta_in,
1231: array_numeric_in, # NUMERIC[]
1263: array_text_in, # cstring[]
1700: numeric_in, # NUMERIC
2275: text_in, # cstring
2950: uuid_in, # uuid
3802: json_in, # jsonb
}
# PostgreSQL encodings:
# https://www.postgresql.org/docs/current/multibyte.html
#
# Python encodings:
# https://docs.python.org/3/library/codecs.html
#
# Commented out encodings don't require a name change between PostgreSQL and
# Python. If the py side is None, then the encoding isn't supported.
pg_to_py_encodings = {
# Not supported:
"mule_internal": None,
"euc_tw": None,
# Name fine as-is:
# "euc_jp",
# "euc_jis_2004",
# "euc_kr",
# "gb18030",
# "gbk",
# "johab",
# "sjis",
# "shift_jis_2004",
# "uhc",
# "utf8",
# Different name:
"euc_cn": "gb2312",
"iso_8859_5": "is8859_5",
"iso_8859_6": "is8859_6",
"iso_8859_7": "is8859_7",
"iso_8859_8": "is8859_8",
"koi8": "koi8_r",
"latin1": "iso8859-1",
"latin2": "iso8859_2",
"latin3": "iso8859_3",
"latin4": "iso8859_4",
"latin5": "iso8859_9",
"latin6": "iso8859_10",
"latin7": "iso8859_13",
"latin8": "iso8859_14",
"latin9": "iso8859_15",
"sql_ascii": "ascii",
"win866": "cp886",
"win874": "cp874",
"win1250": "cp1250",
"win1251": "cp1251",
"win1252": "cp1252",
"win1253": "cp1253",
"win1254": "cp1254",
"win1255": "cp1255",
"win1256": "cp1256",
"win1257": "cp1257",
"win1258": "cp1258",
"unicode": "utf-8", # Needed for Amazon Redshift
}
# pg element oid -> pg array typeoid
PG_ARRAY_TYPES = {
16: 1000,
25: 1009, # TEXT[]
701: 1022,
1043: 1009,
1700: 1231, # NUMERIC[]
}
def array_find_first_element(arr):
for v in array_flatten(arr):
if v is not None:
return v
return None
def array_flatten(arr):
for v in arr:
if isinstance(v, list):
for v2 in array_flatten(v):
yield v2
else:
yield v