{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Notebook for ingesting ENEM data\n", "\n", "This notebook aims to ingest ENEM microdata stored on the [INEP](http://portal.inep.gov.br/web/guest/microdados) website, where it basically consists of the following steps:\n", "\n", "1. Prepare the environment (install unzips, create folders, define variables and import python libraries)\n", "2. Create Python functions that will extract the data from the unzipped file, convert and upload it to the S3 bucket\n", "3. Download the files from the INEP website according to the list defined in the **step 1** variable\n", "4. Perform the transformation functions defined in **step 2** in the downloaded .zip files\n", "5. Create the database in AWS Glue, then create and run the crawler that will register the tables in the data catalog\n", "\n", "After completing this notebook, you can proceed with the data transformation, exploration and consumption processes in Athena.\n", "\n", "*Note: it is a prerequisite for running this notebook to create the [CloudFormation stack](https://github.com/aws-samples/aws-edu-exam-analytics/blob/main/templates/EduLabCFN.yaml) as specified in the aws-samples repository.*\n", "\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1- *Download and binary installation for rar uncompress (one of the downloaded files contains a .rar file)*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "\n", "wget https://www.rarlab.com/rar/rarlinux-x64-5.9.1.tar.gz \n", "tar -xvzf rarlinux-x64-5.9.1.tar.gz\n", "rm rarlinux-x64-5.9.1.tar.gz" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2- *Download and install wget library in Python*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install wget" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.3- *Import Python libraries that will be used by code in next cells*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import zipfile\n", "import wget\n", "import fnmatch\n", "import os\n", "import gzip\n", "import boto3\n", "import botocore\n", "import sys\n", "import shutil\n", "from zipfile import ZipFile\n", "from pprint import pprint" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.4- *Variables definition and folders criation to store zip packages downloaded and microdata files in csv format*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#define variáveis e cria diretorios de trabalho \n", "\n", "zipdir = 'zips'\n", "outdir = 'microdados'\n", "list_arq=[\n", " 'microdados_enem2012'\n", " ,'microdados_enem2013'\n", " ,'microdados_enem2014'\n", " ,'microdados_enem2015'\n", " ,'microdados_enem2016'\n", " ,'microdados_enem2017'\n", " ,'microdados_enem2018'\n", " ,'microdados_enem_2019']\n", "\n", "#troque o nome do bucket para o criado no stack do Cloudformation\n", "bucket='edu-bucket-999'\n", "folder='data'\n", "\n", "print(list_arq)\n", "os.mkdir(zipdir)\n", "os.mkdir(outdir)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2- *Function blocks to provide wget download status, uncompress, transform, and upload data to S3 bucket*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#bloco de funções\n", "\n", "#status do download wget\n", "def bar_custom(current, total, width=80):\n", " progress_message = \"Downloading: %d%% [%d / %d] bytes\" % (current / total * 100, current, total)\n", " sys.stdout.write(\"\\r\" + progress_message)\n", "\n", "#converte pra utf8 e comprime\n", "def convert_compress_file(csvfile):\n", " year=csvfile[-8:-4]\n", " filenameout=\"microdados/MICRODADOS_ENEM_\"+year+\".csv.gz\"\n", " print(\"Convertendo \"+csvfile+\" para utf-8 e compactando para \"+filenameout+\"...\")\n", "\n", " #converte, compacta e remove aspas se for o caso\n", " with open(csvfile,encoding='cp1252') as filein,gzip.open(filenameout,'wt',encoding='utf8') as fileout:\n", " for line in filein:\n", " fileout.write(line.replace('\"', ''))\n", " os.remove(csvfile)\n", " return filenameout\n", "\n", "#carrega dados no bucket\n", "def upload_s3(upfile,bucket,folder):\n", " year=upfile[-11:-7]\n", " s3 = boto3.resource('s3')\n", " data = open(upfile, \"rb\")\n", " key = folder + '/enem_microdados_' + year + '/' + os.path.basename(upfile)\n", " print(\"Carregando \"+key+\" para o bucket \"+bucket)\n", " s3.Bucket(bucket).put_object(Key=key, Body=data)\n", "\n", " \n", "# transformação e upload do arquivo csv\n", "def microdados_transform(microfile):\n", " pattern1=\"*/DADOS_*.csv\" \n", " pattern2=\"*/[Mm][Ii][Cc][Rr][Oo]*.csv\" \n", " unrarcmd=\"/home/ec2-user/SageMaker/rar/unrar e \"\n", "\n", " with ZipFile(microfile, 'r') as zipObj:\n", " listOfiles = zipObj.namelist()\n", " #Se for arquivo rar\n", " if fnmatch.filter(listOfiles, '*.rar'):\n", " rarfile=fnmatch.filter(listOfiles, '*.rar')[0]\n", " print(\"Arquivo rar \"+rarfile)\n", " zipObj.extractall()\n", " unrarlb=\"/home/ec2-user/SageMaker/rar/unrar lb \"+rarfile+\" | grep MICRO | grep csv\"\n", " extractfile=os.popen(unrarlb).readline().rstrip(\"\\r\\n\")\n", " #extractfile=os.path.basename(result)\n", " print(\"Extraindo arquivo \"+extractfile)\n", " #print(unrarcmd+rarfile+\" \"+extractfile)\n", " os.system(unrarcmd+rarfile+\" \"+extractfile)\n", " print(\"Movendo arquivo para pasta microdados\")\n", " finalfile='microdados/'+os.path.basename(extractfile)\n", " os.rename(os.path.basename(extractfile),finalfile)\n", " os.remove(rarfile)\n", " else:\n", " for extractfile in fnmatch.filter(listOfiles, '*.csv'):\n", " if fnmatch.fnmatch(extractfile,pattern1) or fnmatch.fnmatch(extractfile,pattern2):\n", " print(\"Arquivo zip \"+microfile)\n", " print(\"Extraindo arquivo \"+extractfile)\n", " zipObj.extract(extractfile)\n", " print(\"Movendo arquivo para pasta microdados\")\n", " finalfile='microdados/'+os.path.basename(extractfile)\n", " os.rename(extractfile,finalfile)\n", " basepath=extractfile.split(\"/\")[0]\n", " print(\"Removendo \"+ basepath)\n", " shutil.rmtree(basepath)\n", " return finalfile\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3- *Download of microdata file packages from INEP site according to file list in variable previously defined*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Download dos arquivos (com base em list_arq)\n", "\n", "for item in list_arq:\n", " year=item[-4:]\n", " if os.path.isfile('zips/'+item+'.zip'):\n", " print(\"arquivo \"+item+\".zip já existe\")\n", " else:\n", " print(\"carregando arquivo \"+item+\"...\")\n", " url='http://download.inep.gov.br/microdados/'+item+'.zip'\n", " wget.download(url,bar=bar_custom, out='zips')\n", " print(\" ok\")\n", " #os.system('wget '+url)\n", " #break\n", "\n", "print(\"fim dos downloads\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4- *Execute in loop for each downloaded .zip file the uncompress, convertion and upload functions*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#loop completo\n", "\n", "bucket='edu-bucket-999'\n", "\n", "for filename in sorted(os.listdir('zips')):\n", " print(\">>Processando zips/\"+filename)\n", " #extraindo arquivo csv\n", " result_tr=microdados_transform('zips/'+filename)\n", " \n", " #convertendo e comprimindo\n", " result_conv=convert_compress_file(result_tr)\n", "\n", " #enviando para o bucket s3\n", " upload_s3(result_conv,bucket,folder)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.1- *Define database name variable, and initialize boto3 object to execute AWS Glue related tasks*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Change the database name if desired\n", "dbname = 'db_education'\n", "\n", "#Change the region name if necessary\n", "region='us-east-1'\n", "\n", "glue_client = boto3.client('glue', region_name=region)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.2- *Creates Glue Crawler for S3 bucket where the microdata files were uploaded*\n", "\n", "*Obs: Update the **rolename** variable with IAM Role created in CloudFormation stack*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Criando Crawler\n", "\n", "#Mude para o arn do Role criado no Cloudformation - coloque sua account ID e troque o nome se for o caso\n", "rolename='arn:aws:iam::99999999999:role/AWSGlueServiceRole-Educ'\n", "\n", "response = glue_client.create_crawler(\n", " Name='enem-crawler',\n", " Role=rolename,\n", " DatabaseName=dbname,\n", " Description='Crawler para bucket da educação',\n", " Targets={\n", " 'S3Targets': [\n", " {\n", " 'Path': 's3://'+bucket+'/'+folder,\n", " 'Exclusions': [\n", " ]\n", " },\n", " ]\n", " },\n", " SchemaChangePolicy={\n", " 'UpdateBehavior': 'UPDATE_IN_DATABASE',\n", " 'DeleteBehavior': 'DELETE_FROM_DATABASE'\n", " }\n", ")\n", "pprint(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.4- *Run Crawler previously defined*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue_client.start_crawler(Name='enem-crawler')\n", "pprint(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### After executing steps above, it's available in Glue Data Catalog the microdata tables, that can be used in Amazon Athena and other tools/services." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.4" } }, "nbformat": 4, "nbformat_minor": 4 }