AWS Glue-S3 ETL Process using Pyspark
In this tutorial, I am going to explain how to create a data transformation (ETL) pipeline using AWS Glue and S3
Briefly, I am going to explain with the below Use cases,
There are 2 set of data, one which contain Employee details and other is Employee City Mapping Details. Requirement is to create a nested json object contains output following transformation
Use Cases
- Map the country from the Employee City Mapping table and associate the state and country for respective city
- Create a single Column for Name called ’Full Name’ by concating ‘First and Last Name’.
- Identify phone numbers without having country code- ie.’ +00 ’ and prefix the country code in those identified non country code numbers
- Create a respective S3 bucket for each country identified uniquely and store transformation result.
Below is the Glue S3- ETL Architecture. Here we are skipping the Redshift part for warehousing the transformed output from the Glue job
AWS – Glue: AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy for customers to prepare and load their data for analytics. You can create and run an ETL job with a few clicks in the AWS Management Console.
Glue Terminology
- Data Catelog: The AWS Glue Data Catalog contains references to data that is used as sources and targets of your extract, transform, and load (ETL) jobs in AWS Glue. To create your data warehouse or data lake, you must catalog this data. The AWS Glue Data Catalog is an index to the location, schema, and runtime metrics of your data.
- Classifier:It specifies the schema about the data. It provides connection to the data source such as from RDBMS /s3..
- Connection:Which contain the property required to connect to the data store.
- Crawler:A crawler can crawl/connect multiple data stores in a single run. Upon completion, the crawler creates or updates one or more tables in your Data Catalog.
Use case Implementation:
Here I am implementing above mentioned use case using ‘Pyspark’ as script which is going to be defined through Glue Job.
As I already mentioned, there are two data set – Employee details and other is Employee City Mapping Details , both has to be stored in data source, for the same we are using S3 bucket.
S3 1 – Store – Employee details.csv
S3 2- Store – Employee City Mapping Details.csv
S3-3 For strong the output
There are two ways we can read this data set in Glue
- One is creating the Database and table by creating the end point to the respective data source.
- Another is reading data directly from S3 bucket.
Here I used the second approach in reading data set from s3 directly in the glue job. But I also explaining How to create a data catalog and read data from the data base.
Creating Data source via Data Base and Table.First create a s3 bucket which contains two folder- first is for reading data set and another one is for writing the output. In the read folder, upload the csv file which we want to process.
Now creating the crawler :
2. Select the source type: Data store
3. Choose the s3 bucket path
- Skip the choose another data store since we only choosing one data source(select No) and click next
- Choose IAM policy : Can choose exiting IAM role or can create a new role.
- Next choose the scheduler to run: I just choose the Run on demand in this case.
- Now need to configure the crawler’s output : For that we need to create a Data base
- Choose to create a new Database or if you have already created, just choose the same.
- Next review the configuration and finish
- Now run the crawler to create the table from the s3 data source that already we defined. The table will store meta data from s3 . Wait until the status get stopped
- Once the table is created successfully, the same can be observed in the table details.
- Now we have to create the JOB – > Go to the Workflow from left side and choose the job
- Add – Job: Give a name for the job and choose the exiting policy or create a new one.
- Under the Type part: Choose the frame work which we want to use it for the development. Here iam choosing spark and Glue version
- Under this Job Runs: choose the proposed script generated by glue or can choose existing script. Here we can choose the first option and customize the script according to our usage.
- Now navigate to the Advanced security part and choose the maximum capacity part and job timeout as per the requirement.
- Once it is done, choose the Next and choose the data bases that we already created and click next
- In the transform type: choose change schema or find matching records: Here we can change the existing schema of our data source . Here I am choosing the existing schema and click Next.
- Now choose the data target and next will see the schema defined for the data set from csv which we uploaded in the target s3 bucket.
- If need to edit the schema, we can apply the changes here.
- Once it done, the script environment ide will displayed, where we can see the Glue default script. We can customize the script according to the use case which want to perform on the existing data.
- Once you customized the script click save and click Run Job.
Reading data set directly from s3 bucket
As already mentioned, the second option is to read data directly from the s3 bucket.
Following are the data set which we are going to perform on:
Employee Data.csv
City Mapping Data.csv:
Below pyspark script going to implement following use case
Use Cases
- Map the country from the Employee City Mapping table and associate the state and country for respective city
- Create a single Column for Name called ’Full Name’ by concatinating ‘First and Last Name’.
- Identify phone numbers without having country code- ie.’ +00 ’ and prefix the country code in those identified non country code numbers
- Create a respective S3 bucket for each country identified uniquely and store transformation result.
from datetime import datetime #Import pyspark modules from pyspark.context import SparkContext import pyspark.sql.functions as f from pyspark.sql.functions import lit, concat, when #Import glue modules from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from awsglue.job import Job import json import boto3 from botocore.exceptions import ClientError from pyspark.sql.functions import udf from pyspark.sql.types import StringType from pyspark.sql import functions as sf from pyspark.sql import SparkSession from pyspark.sql.functions import concat, col, lit session = SparkSession.builder.appName("Spark Demo").getOrCreate() #Initialize contexts and session spark_context = SparkContext.getOrCreate() glue_context = GlueContext(spark_context) session = glue_context.spark_session s3_source = boto3.resource('s3') ######################################### ### EXTRACT (READ DATA) # Loading data from both s3 bucket as Data frame ### Emp DF df_Emp = session.read.format("csv").option("header", "true").load("s3://htc-source-for-glue/read/emp_updated_city_gender.csv") print("Showing emp df details ") df_Emp.show() ### City DF df_city = session.read.format("csv").option("header", "true").load("s3://htc-poc-glue-source-for-citymap/read/updatedCityNames.csv") print("Showing city Details") df_city.show() ## Use Case 1: Join emp and city DF by mapping the state and country that is associate with the respective city from the emp df. ## Clean the data set while joining by removing Null and duplicate values joinDF=df_Emp.join(df_city,df_Emp.city==df_city.CityName, how='left').drop('StateID','city','S.No','CountryCode') print("Showing Join DF") joinDF.show() ### Remove Null values print("Removing null") dropNullDF= joinDF.where(sf.col('CityName').isNotNull()) dropNullDF.show() ## Remove Duplicate rows removeDuplicate=dropNullDF.drop_duplicates(subset=['employeeCode']) print("Removing duplicate") removeDuplicate.show() ## Use Case 2: Concatinate 'Full Name' and 'Last Name' concatDF = removeDuplicate.withColumn('Full Name', sf.concat(sf.col('firstName'),lit(' '),sf.col('lastName') print("Showing Concat DF") concatDF.show() concatDF.printSchema() ### Use case 3: Identify phone numbers those not having country code. ## Function to replace the phone number if its not start with zero def addCountry_code(phoneNo): countryCode= '+00'+phoneNo if phoneNo[:1] !='0': return str(countryCode) else: return str(phoneNo) phone_replace_udf=session.udf.register('phone_replace_udf', lambda x: addCountry_code(x), StringType()) phoneNo_rep_DF= concatDF.withColumn("phoneNumber", phone_replace_udf(sf.col('phoneNumber'))) print("Show updated df details") phoneNo_rep_DF.show() ## Use Case 4: Create Nested json object by including Address field for respective employees. Create respective nested json object for respective countries. def update_emp_dict_to_Json(empDict): key1 = ['userId', 'jobTitleName', 'firstName', 'lastName', 'employeeCode'] key2 = ['Full Name','gender', 'phoneNumber', 'CityName', 'StateName','CountryName','emailAddress'] empDetail1={} empDetail2={} for k in key1: empDetail1[k] = empDict[k] for k2 in key2: empDetail2[k2] = empDict[k2] empDetail1.update({'Adress': empDetail2}) return empDetail1 #Convert the the emp dictionary to the nested json object def get_Nested_EMp_Json_Object(dataFrame): jsonContent = dataFrame.toJSON() empArrayObj = [] for row in jsonContent.collect(): print(row) empDetails= eval(row) empArrayObj.append(update_emp_dict_to_Json(empDetails)) print("Updated nested Emp",json.dumps(empArrayObj)) return empArrayObj phoneNo_rep_DF.show() phoneNo_rep_DF.printSchema() def getS3Bucket(): try: s3Bucket= s3_source.create_bucket(Bucket='empsource', CreateBucketConfiguration={'LocationConstraint': 'ap-south-1'}) return s3Bucket except ClientError as e: print("Unable to create bucket",e) def getOutput_file(country): time = datetime.now() file= country+"/Employee.json" outPutfile= str(time)+file return outPutfile def get_filtered_DataFrame_forRegion(dataFrame): ## Filtering based on StateName filteredDataframe=[{}] countCountry = dataFrame.select('CountryName').distinct() listOfcountry=[list(row) for row in countCountry.collect()] print("Country list is", listOfcountry) try: for country in listOfcountry: stateFilter_Df=dataFrame.filter(sf.col('CountryName').isin(["".join(country)])) print("Showing filtered") filtered_Data_json_object=(get_Nested_EMp_Json_Object(stateFilter_Df)) print("Filtered Data frame for Region", json.dumps(filtered_Data_json_object)) outputEmpfile= getOutput_file("".join(country)) s3Bucket=getS3Bucket() s3_source.Object('htcpoc-output', outputEmpfile).put(Body=(bytes(json.dumps(filtered_Data_json_object).encode('utf-8-sig')))) except AttributeError as e: print("Error due to :".e) get_filtered_DataFrame_forRegion(phoneNo_rep_DF)
