# Geo-Spatial processing of GIS shapefiles with Amazon Athena

Processing geo-spatial data often requires to assign spatial data points to regions or zones, such as zip codes, urban neighborhoods, operational districts, etc.
The Athena Engine 2.0 no supports spatial queries.

We demonstrate using Athena to join regions or zones to geo-spatial datapoints. The regons tabels are created from ArcGIS shape files.

## Data Sources

Download data from these public data sources: the first set includes geo-spatial locations with longitude and latitude, the remaining three sets are shape files that define areas on the map. We use those to map locations to areas, such as neighborhoods, designated service areas, and zip codes.

|Raw data | Description | Source |
|---------|-------------|--------|
| dockless-vehicles-3.csv | A public dataset of dock-less vehicle rentals, provided by the Office of Civic Innovation and Technology (https://louisvilleky.gov/government/civic-innovation-and-technology/civic-innovation) of the Louisville (KY) Metro Government. The table includes trip data with longitude and latitude of the start and end locations | https://data.louisvilleky.gov/dataset/dockless-vehicles |
| Dockless_Vehicle_Service_Area.* | Shape files of service areas |  https://data.louisvilleky.gov/dataset/dockless-vehicles |
| cb_2018_us_zcta510_500k.* | [ZIP Code Tabulation Areas](https://www.census.gov/programs-surveys/geography/guidance/geo-areas/zctas.html) (ZCTAs) are generalized areal representations of United States Postal Service (USPS) ZIP Code service areas. The USPS ZIP Codes identify the individual post office or metropolitan area delivery station associated with mailing addresses. USPS ZIP Codes are not areal features but a collection of mail delivery routes. | https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_zcta510_500k.zip |
| Louisville_KY_Urban_Neighborhoods | The Louisville Neighborhoods layer consists of polygons representing approximate boundaries and extents of historical and cultural neighborhoods within the pre-merger limits of the City of Louisville, or post-merger Urban Service District. | https://data.lojic.org/datasets/louisville-ky-urban-neighborhoods| 

## Setup: Load packages, define functions

In addition to [Pandas](https://pandas.pydata.org) and [NumPy](https://numpy.org) you need to install the following package:
1. [s3fs](https://github.com/dask/s3fs/) is required by Pandas. It supports Amazon S3 URIs as file names in the Pandas methords `read_*` and `to_*`. The package loads indirectly with Pandas.
2. [PyAthena](https://github.com/laughingman7743/PyAthena/) provides the database connection to Amazon Athena
3. [Python Shapefile Library](https://github.com/GeospatialPython/pyshp) reads and writes ESRI Shapefiles.


In [3]:
import os
import sys
import datetime
import pandas as pd
import numpy as np
import s3fs       # required to read and write tables on Amazon S3. Not called, but importing it ensures that the package is available
import pyathena   # connection to Amazon Athena
import shapefile  # read and write ESRI Shapefiles

S3_BUCKET          = '<Here goes your S3 bucket>'
S3_TABLE_PREFIX    = '<Here goes the S3 prefix>'
S3_ATHENA_QUERY    = f's3://{S3_BUCKET}/{S3_TABLE_PREFIX}/query_output'
ATHENA_SCHEMA_NAME = '<Here goes your Glue database>'
ATHENA_WORK_GROUP  = '<Here goes your Athena Workgroup>'
REGION             = '<Here goes your region>'

def get_athena_connecttion():
    '''
    Open database connection to Amazon Athena
    
    Returns:
        connection object for Pandas
    '''
    return pyathena.connect(s3_staging_dir=S3_ATHENA_QUERY, schema_name=ATHENA_SCHEMA_NAME, region_name=REGION, work_group=ATHENA_WORK_GROUP)
    
    
def save_as_external_athena_table(conn, df, bucket, dirkey, dbname, tblname, overwrite=False):
    '''
    Save Pandas DataFrame as external Athena table. The function supports the following column types: timestamp, int, double, and string.
    The column names will be converted to lower case. The names must not include any special characters, exctept '_'.
    
    Parametes:
        conn:    PyAthena connection
        df:      pandas.DataFrame
        bucket:  S3 bucket
        dirkey:  S3 prefix to data file on S3
        dbname:  Name of GLUE database
        tblname: Name of GLUE table
    '''
    assert dirkey[-1] != '/', "Don't use '/' in S3 key"
    
    df.rename(lambda s: s.lower(), axis=1) \
      .to_csv(f"s3://{bucket}/{dirkey}/table.tsv", index=None, quoting=None, sep='\t', header=None, date_format='%Y-%m-%d %H:%M:%S')
    
    type_renaming = {
        'datetime64[ns]' : 'timestamp',
        'int64' : 'int',
        'float64': 'double',
        'object': 'string',
    }

    cols = [ "`{}` {}".format(r[1], type_renaming[str(r[2])]) for r in df.dtypes.reset_index().itertuples() ]
    cols_string = ',\n'.join(cols)
    
    curs = conn.cursor()
    if overwrite:
        try:
            res = curs.execute(f"""DROP TABLE {dbname}.{tblname}""")
        except pyathena.error.OperationalError as ex:
            if 'Table not found' in str(ex):
                pass
            else:
                raise ex
        
    q = f"""
        CREATE EXTERNAL TABLE IF NOT EXISTS {dbname}.{tblname} (
          {cols_string}
        )
        ROW FORMAT DELIMITED 
        FIELDS TERMINATED BY '\\t' 
        LINES TERMINATED BY '\\n' 
        LOCATION 's3://{bucket}/{dirkey}'
        TBLPROPERTIES ('has_encrypted_data'='false')
    """
  
    res = curs.execute(q)
    return res


def centroid(points):
    '''
    Computes centroid of a 2-dimensional area https://en.wikipedia.org/wiki/Centroid
    
    Parameters:
        points: numpy array of polynom coordinates
        
    Returns:
        x/y-coordinates of centroid, area of centroid
    '''
    pts = np.array(points)
    assert pts.shape[1] == 2, "Not 2-dimensional"
    n  = pts.shape[0]
    x = pts[:,0]
    y = pts[:,1]
    xyxy = [  x[i] * y[(i+1)%n] - x[(i+1)%n] * y[i] for i in range(n) ]
    A = 0.5 * np.sum(xyxy)
    Cx = np.sum([  (x[i] + x[(i+1)%n]) * xyxy[i]  for i in range(n) ])/(6*A)
    Cy = np.sum([  (y[i] + y[(i+1)%n]) * xyxy[i]  for i in range(n) ])/(6*A)
    return Cx, Cy, np.abs(A)


def load_shape_file(shape_file_name):
    '''
    Function to load 
    
    Parameters:
    
    Returns:
        Pandas DataFrame: the field `shape` holds the polygon defintion of the shape that can be interpreted
        by the Athena function `ST_GeometryFromText()`
        Other fields include coordinates of the bounding boxes and centroids, and meta data from the shapefile
    '''
    sf = shapefile.Reader(shape_file_name)
    recs = sf.records()
    shps = sf.shapes()
    fields = sf.fields
    
    shape_df = pd.DataFrame()
    for i, rec in enumerate(recs):
        rdata = {}
        for fld, dt, le, ze in sf.fields[1:]:
            rdata[fld] = [ rec[fld] ]
        bbox = shps[i].bbox
        for j, b in enumerate(['bb_west', 'bb_south', 'bb_east', 'bb_north']):
            rdata[b] = [ bbox[j] ]
        rdata['shape'] = [ """POLYGON(({poly}))""" \
                                .format(poly = ','.join(["{lo} {la}".format(lo=p[0], la=p[1]) 
                                                                        for p in shps[i].points ]))
                         ]
        cog_long, cog_lat, _ = centroid(shps[i].points)
        rdata['cog_longitude'] = cog_long
        rdata['cog_latitude'] = cog_lat

        tmp_df = pd.DataFrame(rdata)
        shape_df = pd.concat([shape_df, tmp_df])
    
    shape_df.index = range(shape_df.shape[0])
    return shape_df


## Process Shape Files
Download the shape files. Each data set consists of four files with the same name and different extension. Just provide the name of the `.shp` file to load. The other files must be in the same directory.
 
- load shapefiles into Pandas DataFrames with `load_shape_file()`, then
- save on S3 and create external table in Athena with `save_as_external_athena_table()`

### Zip Codes

In [40]:
%%time
zip_code_df = load_shape_file('./data/cb_2018_us_zcta510_500k.shp')
print(f"Number of records: {zip_code_df.shape[0]:,}")

Number of records: 33,144
CPU times: user 5min 22s, sys: 10.2 s, total: 5min 32s
Wall time: 5min 48s


In [41]:
display(zip_code_df.head())

Unnamed: 0,ZCTA5CE10,AFFGEOID10,GEOID10,ALAND10,AWATER10,bb_west,bb_south,bb_east,bb_north,shape,cog_longitude,cog_latitude
0,36083,8600000US36083,36083,659750662,5522919,-85.897787,32.233444,-85.449885,32.513303,"POLYGON((-85.63224699999999 32.280982,-85.6243...",-85.690393,32.387282
1,35441,8600000US35441,35441,172850429,8749105,-87.870502,32.756722,-87.624214,32.940973,"POLYGON((-87.83287399999999 32.844372,-87.8318...",-87.73861,32.853736
2,35051,8600000US35051,35051,280236456,5427285,-86.745076,33.099176,-86.481504,33.33738,"POLYGON((-86.743844 33.250019,-86.738019 33.25...",-86.619713,33.205467
3,35121,8600000US35121,35121,372736030,5349303,-86.585266,33.829742,-86.314226,34.071217,"POLYGON((-86.58526599999999 33.94743,-86.58032...",-86.455272,33.941246
4,35058,8600000US35058,35058,178039922,3109259,-86.88833,34.171265,-86.636051,34.306979,"POLYGON((-86.87884199999999 34.211959,-86.8764...",-86.737877,34.231338


In [42]:
%%time
if not 'conn' in globals():
    print("Connecting to Athena")
    conn = get_athena_connecttion()

save_as_external_athena_table(conn, zip_code_df,
                              S3_BUCKET, f"{S3_TABLE_PREFIX}/zipcode_shapes",
                              ATHENA_SCHEMA_NAME, 'zipcode_shapes',
                              overwrite=True)

CPU times: user 3.93 s, sys: 95 ms, total: 4.03 s
Wall time: 9.21 s


<pyathena.cursor.Cursor at 0x7f8cf6caf438>

In [43]:
pd.read_sql("""SELECT * FROM blog1.zipcode_shapes LIMIT 5""", conn)

Unnamed: 0,zcta5ce10,affgeoid10,geoid10,aland10,awater10,bb_west,bb_south,bb_east,bb_north,shape,cog_longitude,cog_latitude
0,99158,8600000US99158,99158,235445158,0,-117.386197,47.02792,-117.123919,47.25976,"POLYGON((-117.386197 47.087768,-117.383378 47....",-117.247358,47.130323
1,3215,8600000US03215,3215,124336441,177915,-71.571252,43.892722,-71.361965,43.999343,"POLYGON((-71.569045 43.901404,-71.569515 43.90...",-71.452823,43.945304
2,29547,8600000US29547,29547,103218389,215692,-79.411154,34.399488,-79.235604,34.57694,"POLYGON((-79.411154 34.57694,-79.4053249999999...",-79.333384,34.487992
3,17728,8600000US17728,17728,122976765,691784,-77.209153,41.274431,-76.945359,41.400996,"POLYGON((-77.209153 41.290804,-77.206322 41.29...",-77.085332,41.327618
4,16040,8600000US16040,16040,50810948,8983,-79.896532,41.04617,-79.77083,41.136607,"POLYGON((-79.896532 41.100111,-79.881548999999...",-79.84301,41.092546


### Loisville/KY Neighborhoods

In [13]:
%%time
neighnorhood_df = load_shape_file('./data/Louisville_KY_Urban_Neighborhoods.shp')
print(f"Number of records: {neighnorhood_df.shape[0]:,}")

Number of records: 92
CPU times: user 617 ms, sys: 6.32 ms, total: 624 ms
Wall time: 626 ms


In [14]:
display(neighnorhood_df.head())

Unnamed: 0,OBJECTID,NH_CODE,NH_NAME,SHAPEAREA,SHAPELEN,bb_west,bb_south,bb_east,bb_north,shape,cog_longitude,cog_latitude
0,1,99,REMAINDER OF CITY,25583330.0,23718.32369,-85.72344,38.263231,-85.693242,38.281569,POLYGON((-85.70281560232544 38.270463184337494...,-85.708712,38.272222
1,2,53,PORTLAND,70098300.0,43851.334658,-85.820222,38.256329,-85.763753,38.280011,"POLYGON((-85.8202215407467 38.27686470289577,-...",-85.792998,38.268303
2,3,62,SHAWNEE,59953850.0,34088.057055,-85.833282,38.249823,-85.804312,38.276865,"POLYGON((-85.8202215407467 38.27686470289577,-...",-85.818672,38.261047
3,4,12,BROWNSBORO ZORN,21977320.0,28060.141437,-85.702816,38.259572,-85.668382,38.275999,POLYGON((-85.70281560232544 38.270463184337494...,-85.688639,38.26663
4,5,23,CLIFTON HEIGHTS,17858640.0,19464.933453,-85.716873,38.258293,-85.692613,38.270738,POLYGON((-85.71500895019209 38.264042197247306...,-85.704025,38.263401


In [15]:
if not 'conn' in globals():
    print("Connecting to Athena")
    conn = get_athena_connecttion()

save_as_external_athena_table(conn, neighnorhood_df,
                              S3_BUCKET, f"{S3_TABLE_PREFIX}/loisville_ky_neighborhoods",
                              ATHENA_SCHEMA_NAME, 'loisville_ky_neighborhoods',
                              overwrite=True)

<pyathena.cursor.Cursor at 0x7f8d2c5f3a58>

In [16]:
pd.read_sql("""SELECT * FROM blog1.loisville_ky_neighborhoods LIMIT 5""", conn)

Unnamed: 0,objectid,nh_code,nh_name,shapearea,shapelen,bb_west,bb_south,bb_east,bb_north,shape,cog_longitude,cog_latitude
0,1,99,REMAINDER OF CITY,25583330.0,23718.32369,-85.72344,38.263231,-85.693242,38.281569,POLYGON((-85.70281560232544 38.270463184337494...,-85.708712,38.272222
1,2,53,PORTLAND,70098300.0,43851.334658,-85.820222,38.256329,-85.763753,38.280011,"POLYGON((-85.8202215407467 38.27686470289577,-...",-85.792998,38.268303
2,3,62,SHAWNEE,59953850.0,34088.057055,-85.833282,38.249823,-85.804312,38.276865,"POLYGON((-85.8202215407467 38.27686470289577,-...",-85.818672,38.261047
3,4,12,BROWNSBORO ZORN,21977320.0,28060.141437,-85.702816,38.259572,-85.668382,38.275999,POLYGON((-85.70281560232544 38.270463184337494...,-85.688639,38.26663
4,5,23,CLIFTON HEIGHTS,17858640.0,19464.933453,-85.716873,38.258293,-85.692613,38.270738,POLYGON((-85.71500895019209 38.264042197247306...,-85.704025,38.263401


### Dockless Service Zones

In [19]:
%%time
dockless_zone_df = load_shape_file('./data/Dockless_Vehicle_Distribution_Zones.shp')
print(f"Number of records: {dockless_zone_df.shape[0]:,}")

Number of records: 9
CPU times: user 63.8 ms, sys: 3.91 ms, total: 67.7 ms
Wall time: 66 ms


In [20]:
display(dockless_zone_df)

Unnamed: 0,Dist_Zone,Pl2040Area,bb_west,bb_south,bb_east,bb_north,shape,cog_longitude,cog_latitude
0,1,Northwest Core,1185772.0,275565.35375,1205373.0,287823.380152,"POLYGON((1193691.6109733582 287815.0763631761,...",1195295.0,281342.995387
1,9,West Core,1184528.0,263536.76625,1205891.0,277443.735876,"POLYGON((1204659.266196905 274362.96903444396,...",1195880.0,271588.566397
2,2,Downtown,1204477.0,271532.27375,1218294.0,280954.288272,"POLYGON((1205218.9462500215 280123.6099999994,...",1210482.0,276616.567796
3,6,University,1203515.0,254582.969379,1215279.0,273211.3775,"POLYGON((1212727.448750025 270748.27874999365,...",1208898.0,263606.969458
4,3,Northeast Core,1212013.0,274241.3,1229596.0,287709.093407,POLYGON((1223601.7970568056 287709.09340739076...,1221822.0,280371.76615
5,5,Southeast Core,1210765.0,254840.497409,1239122.0,276544.75875,"POLYGON((1220489.8675000072 276140.2512500117,...",1221418.0,265314.126921
6,4,East Core,1220823.0,266079.29125,1247259.0,292598.833856,"POLYGON((1231939.7309992649 292598.8338559895,...",1235454.0,278947.829163
7,8,Southwest Core,1186221.0,253446.797659,1207267.0,268155.372131,"POLYGON((1198503.793750003 266260.02249999344,...",1195788.0,260001.48126
8,7,Iroquois Park,1186316.0,239861.281195,1208779.0,254935.59625,"POLYGON((1206324.8737500012 244494.5175000131,...",1198511.0,247641.294413


In [22]:
if not 'conn' in globals():
    print("Connecting to Athena")
    conn = get_athena_connecttion()

save_as_external_athena_table(conn, dockless_zone_df,
                              S3_BUCKET, f"{S3_TABLE_PREFIX}/distribution_zones",
                              ATHENA_SCHEMA_NAME, 'distribution_zones',
                              overwrite=False)

<pyathena.cursor.Cursor at 0x7f8cf63186d8>

In [23]:
pd.read_sql("""SELECT * FROM blog1.distribution_zones LIMIT 5""", conn)

Unnamed: 0,dist_zone,pl2040area,bb_west,bb_south,bb_east,bb_north,shape,cog_longitude,cog_latitude
0,1,Northwest Core,1185772.0,275565.35375,1205373.0,287823.380152,"POLYGON((1193691.6109733582 287815.0763631761,...",1195295.0,281342.995387
1,9,West Core,1184528.0,263536.76625,1205891.0,277443.735876,"POLYGON((1204659.266196905 274362.96903444396,...",1195880.0,271588.566397
2,2,Downtown,1204477.0,271532.27375,1218294.0,280954.288272,"POLYGON((1205218.9462500215 280123.6099999994,...",1210482.0,276616.567796
3,6,University,1203515.0,254582.969379,1215279.0,273211.3775,"POLYGON((1212727.448750025 270748.27874999365,...",1208898.0,263606.969458
4,3,Northeast Core,1212013.0,274241.3,1229596.0,287709.093407,POLYGON((1223601.7970568056 287709.09340739076...,1221822.0,280371.76615


## Load trip data

Upload the trip data

In [7]:
trips  = pd.read_csv('data/dockless-vehicles-3.csv')
print(f"Number of records: {trips.shape[0]:,}")
trips.head()

Number of records: 505,993


Unnamed: 0,TripID,StartDate,StartTime,EndDate,EndTime,TripDuration,TripDistance,StartLatitude,StartLongitude,EndLatitude,EndLongitude,DayOfWeek,HourNum
0,0000045c-2677-3a7d-4b73-cad99a57,2019-06-26,19:30,2019-06-26,19:30,3.0,0.0,38.253,-85.756,38.253,-85.755,4,19
1,0000487b-92e6-50d6-7569-42ed3818,2019-09-22,14:30,2019-09-22,14:30,5.0,0.0,38.203,-85.752,38.204,-85.751,1,14
2,00006088-2579-e0d0-6a30-a15bb878,2019-08-21,17:30,2019-08-21,17:30,6.0,0.33,38.259,-85.733,38.265,-85.739,4,17
3,00008c1a-899b-8596-970f-9f6bf495,2019-07-03,11:00,2019-07-03,11:15,6.0,0.64,38.217,-85.757,38.221,-85.763,4,11
4,00009301-3225-2aea-a84a-165a480a,2019-11-22,10:45,2019-11-22,11:00,7.0,0.599,38.215,-85.759,38.222,-85.764,6,10


In [None]:
if not 'conn' in globals():
    print("Connecting to Athena")
    conn = get_athena_connecttion()

save_as_external_athena_table(conn, dockless_zone_df,
                              S3_BUCKET, f"{S3_TABLE_PREFIX}/dockless_vehicles",
                              ATHENA_SCHEMA_NAME, 'dockless_vehicles',
                              overwrite=False)

## Athena Query

Now you can try join the neighborhoods to the geo-locations. Learn more about geo-spatial functions in Athena at https://docs.aws.amazon.com/athena/latest/ug/geospatial-functions-list.html

In [9]:
if not 'conn' in globals():
    print("Connecting to Athena")
    conn = get_athena_connecttion()


q = """
SELECT tr.startdate, tr.starttime, tr.endtime, tr.tripduration, tripdistance 
      , nb1.nh_code AS start_nbid, nb1.nh_name AS start_neighborhood
      , nb2.nh_code AS end_nbid, nb2.nh_name AS end_neighborhood
  
FROM "{database}"."dockless_vehicles" tr

JOIN "{database}"."loisville_ky_neighborhoods" nb1
      ON ST_Within(ST_POINT(CAST(tr.startlongitude AS DOUBLE), CAST(tr.startlatitude AS DOUBLE)), ST_GeometryFromText(nb1.shape))  
JOIN "{database}"."loisville_ky_neighborhoods" nb2
      ON ST_Within(ST_POINT(CAST(tr.endlongitude AS DOUBLE), CAST(tr.endlatitude AS DOUBLE)), ST_GeometryFromText(nb2.shape))

LIMIT 100;
""".format(
    database=ATHENA_SCHEMA_NAME
)

trips_with_nbhd = pd.read_sql(q, con=conn)
print(f"Number of records: {trips_with_nbhd.shape[0]:,}")
trips_with_nbhd.head()

Number of records: 100


Unnamed: 0,startdate,starttime,endtime,tripduration,tripdistance,start_nbid,start_neighborhood,end_nbid,end_neighborhood
0,2019-10-18,10:45,11:00,3.0,0.103,7,BELKNAP,7,BELKNAP
1,2019-07-28,09:30,09:45,4.0,0.66,37,HIGHLANDS DOUGLASS,0,
2,2019-07-21,12:30,12:45,14.0,0.71,17,CENTRAL BUSINESS DISTRICT,17,CENTRAL BUSINESS DISTRICT
3,2019-06-18,06:45,06:45,5.0,0.04,71,UNIVERSITY,71,UNIVERSITY
4,2019-08-10,11:00,11:15,10.0,0.0,17,CENTRAL BUSINESS DISTRICT,17,CENTRAL BUSINESS DISTRICT
