AWS Glue-S3 ETL Process   using Pyspark

In this tutorial, I am going to explain how to create a data transformation (ETL) pipeline using AWS Glue and S3

Briefly, I am going to explain with the below Use cases,

There are 2 set of data, one which contain Employee details and other is Employee City Mapping Details. Requirement is to create a nested json object contains output  following transformation

Use Cases

  • Map the country from the Employee City Mapping table and associate the state and country for respective city
  • Create a single Column for Name called ’Full Name’ by concating ‘First and Last Name’.
  • Identify phone numbers without having country code- ie.’ +00 ’ and prefix the country code in those identified non country code numbers
  • Create a respective S3 bucket for each country identified uniquely and store transformation result.

Below is the Glue S3- ETL Architecture. Here we are skipping the Redshift part for warehousing the transformed output from the Glue job

  AWS – Glue:  AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy for customers to prepare and load their data for analytics. You can create and run an ETL job with a few clicks in the AWS Management Console.

Glue Terminology

  1. Data Catelog: The AWS Glue Data Catalog contains references to data that is used as sources and targets of your extract, transform, and load (ETL) jobs in AWS Glue. To create your data warehouse or data lake, you must catalog this data. The AWS Glue Data Catalog is an index to the location, schema, and runtime metrics of your data.
  1. Classifier:It specifies the schema about the data. It provides connection to the data source such as from RDBMS /s3..
  1. Connection:Which contain the property required to connect to the data store.
  1. Crawler:A crawler can crawl/connect multiple data stores in a single run. Upon completion, the crawler creates or updates one or more tables in your Data Catalog.

Use case Implementation:

Here I am implementing above mentioned use case using ‘Pyspark’ as script which is going to be defined through Glue Job.

As I already mentioned, there are two data set – Employee details and other is Employee City Mapping Details , both has to be stored in data source, for the same we are using S3 bucket.

S3 1 – Store – Employee details.csv

S3 2- Store – Employee City Mapping Details.csv

S3-3 For strong the output

There are two ways we can read this data set in Glue

  1. One is creating the Database and table by creating the end point to the respective data source.
  2. Another is reading data directly from S3 bucket.

Here I used the second approach in reading data set from s3 directly in the glue job. But I also explaining How to create a data catalog and read data from the data base.

 Creating Data source via Data Base and Table.First create a s3 bucket which contains two folder- first is for reading data set and another one is for writing the output. In the read folder, upload the csv file which we want to process.

 
         

 Now creating the crawler :

          2. Select the source type: Data store

         3. Choose the s3 bucket path

 

  1. Skip the choose another data store since we only choosing one data source(select No) and click next
  2. Choose IAM policy : Can choose exiting IAM role or can create a new role.
  3. Next choose the scheduler to run: I just choose the Run on demand in this case.
  4. Now need to configure the crawler’s output : For that we need to create a Data base
  5. Choose to create a new Database or if you have already created, just choose the same.
  6. Next review the configuration and finish
  7. Now run the crawler to create the table from the s3 data source that already we defined. The table will store meta data from s3 . Wait until the status get stopped
  8. Once the table is created successfully, the same can be observed in the table details.
  9. Now we have to create the JOB – > Go to the Workflow from left side and choose the job
  10. Add – Job: Give a name for the job and choose the exiting policy or create a new one.
  11. Under the Type part: Choose the frame work which we want to use it for the development. Here iam choosing spark and Glue version
  12. Under this Job Runs: choose the proposed script generated by glue or can choose existing script. Here we can choose the first option and customize the script according to our usage.
  13. Now navigate to the Advanced security part and choose the maximum capacity part and job timeout as per the requirement.
  14. Once it is done, choose the Next and choose the data bases that we already created and click next
  15. In the transform type: choose change schema or find matching records: Here we can change the existing schema of our data source . Here I am choosing the existing schema and click Next.
  16. Now choose the data target and next will see the schema defined for the data set from csv which we uploaded in the  target s3 bucket.
  17. If need to edit the schema, we can apply the changes here.
  18. Once it done, the script environment ide will displayed, where we can see the Glue default script. We can customize the script according to the use case which want to perform on the existing data.
  19. Once you customized the script click save and click Run Job.

Reading data set directly from s3 bucket

As already mentioned, the second option is to read data directly from the s3 bucket.

Following are the data set which we are going to perform on:

Employee Data.csv

 

City Mapping Data.csv:


 

Below pyspark script going to implement following use case

Use Cases

  1. Map the country from the Employee City Mapping table and associate the state and country for respective city
  2. Create a single Column for Name called ’Full Name’ by concatinating ‘First and Last Name’.
  3. Identify phone numbers without having country code- ie.’ +00 ’ and prefix the country code in those identified non country code numbers
  4. Create a respective S3 bucket for each country identified uniquely and store transformation result.

 

from datetime import datetime

#Import pyspark modules
from pyspark.context import SparkContext
import pyspark.sql.functions as f
from pyspark.sql.functions import lit, concat, when

#Import glue modules
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import json
import boto3
from botocore.exceptions import ClientError
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql import functions as sf
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, lit
session = SparkSession.builder.appName("Spark Demo").getOrCreate()

#Initialize contexts and session
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
session = glue_context.spark_session
s3_source = boto3.resource('s3')

#########################################
### EXTRACT (READ DATA)

# Loading data from both s3 bucket as Data frame 
### Emp DF

df_Emp = session.read.format("csv").option("header", "true").load("s3://htc-source-for-glue/read/emp_updated_city_gender.csv")

print("Showing emp df details ")
df_Emp.show()

### City DF
df_city = session.read.format("csv").option("header", "true").load("s3://htc-poc-glue-source-for-citymap/read/updatedCityNames.csv")

print("Showing city Details")

df_city.show()

## Use Case 1:  Join emp and city DF by mapping  the state and country that is associate with the respective city from the emp df.

## Clean the data set while joining by removing Null and duplicate values
joinDF=df_Emp.join(df_city,df_Emp.city==df_city.CityName, how='left').drop('StateID','city','S.No','CountryCode')

print("Showing  Join DF")
joinDF.show()

### Remove Null values
print("Removing null")
dropNullDF= joinDF.where(sf.col('CityName').isNotNull())
dropNullDF.show()

## Remove Duplicate rows
removeDuplicate=dropNullDF.drop_duplicates(subset=['employeeCode'])
print("Removing duplicate")
removeDuplicate.show()



## Use Case 2: Concatinate 'Full Name' and 'Last Name'

concatDF = removeDuplicate.withColumn('Full Name', sf.concat(sf.col('firstName'),lit(' '),sf.col('lastName')
print("Showing Concat DF")
concatDF.show()
concatDF.printSchema()

### Use case 3: Identify phone numbers those not having country code.
## Function to replace the phone number if its not start with zero
def addCountry_code(phoneNo):
    countryCode= '+00'+phoneNo
    if phoneNo[:1] !='0':
        return str(countryCode)
    else:
        return str(phoneNo)
        
phone_replace_udf=session.udf.register('phone_replace_udf', lambda x: addCountry_code(x), StringType())

phoneNo_rep_DF= concatDF.withColumn("phoneNumber", phone_replace_udf(sf.col('phoneNumber')))

print("Show updated df details")
phoneNo_rep_DF.show()


## Use Case 4: Create Nested json object by including Address field for respective employees. Create respective nested json object for respective countries.
def update_emp_dict_to_Json(empDict):
    key1 = ['userId', 'jobTitleName', 'firstName', 'lastName', 'employeeCode']
    key2 = ['Full Name','gender', 'phoneNumber', 'CityName', 'StateName','CountryName','emailAddress']
    empDetail1={}
    empDetail2={}

    for k in key1:
        empDetail1[k] = empDict[k]
    for k2 in key2:
        empDetail2[k2] = empDict[k2]
        empDetail1.update({'Adress': empDetail2})
    return empDetail1


#Convert the the emp dictionary to the nested json object
def get_Nested_EMp_Json_Object(dataFrame):
    jsonContent = dataFrame.toJSON()
    empArrayObj = []
    for row in jsonContent.collect():
        print(row)
        empDetails= eval(row)
        empArrayObj.append(update_emp_dict_to_Json(empDetails))
        print("Updated nested Emp",json.dumps(empArrayObj))
    return empArrayObj

phoneNo_rep_DF.show()
phoneNo_rep_DF.printSchema()



def getS3Bucket():
    try:
        s3Bucket= s3_source.create_bucket(Bucket='empsource', CreateBucketConfiguration={'LocationConstraint': 'ap-south-1'})
        return s3Bucket
    
    except ClientError as e:
        print("Unable to create bucket",e)





def getOutput_file(country):
    time = datetime.now()
    file= country+"/Employee.json"
    outPutfile= str(time)+file
    return outPutfile








def get_filtered_DataFrame_forRegion(dataFrame):
    ## Filtering based on StateName
    filteredDataframe=[{}]
    countCountry = dataFrame.select('CountryName').distinct()
    listOfcountry=[list(row) for row in countCountry.collect()]
    print("Country list is", listOfcountry)
    try:
        for country in listOfcountry:
            stateFilter_Df=dataFrame.filter(sf.col('CountryName').isin(["".join(country)]))
            print("Showing filtered")
            filtered_Data_json_object=(get_Nested_EMp_Json_Object(stateFilter_Df))
            print("Filtered Data frame for Region", json.dumps(filtered_Data_json_object))
            outputEmpfile= getOutput_file("".join(country))
            s3Bucket=getS3Bucket()
            s3_source.Object('htcpoc-output', outputEmpfile).put(Body=(bytes(json.dumps(filtered_Data_json_object).encode('utf-8-sig'))))

    except AttributeError as e:
        print("Error due to :".e)



get_filtered_DataFrame_forRegion(phoneNo_rep_DF)













This article was written by Umesh , you can reach him in https://www.linkedin.com/in/umeshnarayananav/
Umesh
Cloud Native Developer
error: Content is protected !!