AWS Glue job in a S3 event-driven scenario

I am working with PySpark under the hood of the AWS Glue service quite often recently and I spent some time trying to make such a Glue job s3-file-arrival-event-driven. I succeeded, the Glue job gets triggered on file arrival and I can guarantee that only the file that arrived gets processed, however the solution is not very straightforward. So this is the 10000 ft overview:

event_driven_glue (1) (1)

 

  1. File gets dropped to a s3 bucket “folder”, which is also set as a Glue table source in the Glue Data Catalog
  2. AWS Lambda gets triggered on this file arrival event, this lambda is doing this boto3 call besides some s3 key parsing, logging etc.
    def lambda_handler(event, context):
        ...
        ...
        ...
        # parsedjobname = .. parsed out from the "folder" name in the s3 file arrival event 
        # fullpath = .. parsed from the key in the s3 file arrival event
        try:
            glue_client.start_job_run(JobName=parsedjobname, Arguments={'--input_file_path': full_path})
            return 0
        except ClientError as e:
            logging.error("terminating - , %s", str(e))
            return 1
  3. The glue job corresponding to the “folder” name in the file arrival event gets triggered with this Job parameter set:glue_param
  4. The glue job loads into a Glue dynamic frame the content of the files from the AWS Glue data catalog like:
     datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "your_glue_db", table_name = "your_table_on_top_of_s3", transformation_ctx = "datasource0") 

    It also appends the filename to the dynamic frame, like this:

     from pyspark.sql.functions import input_file_name
    datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name()) 

    and at last, it converts the dynamic frame back to a dataframe like this:

     datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2") 
  5. In this step, we filter the dataframe to process further only the rows from the file related to the S3 file arrival event.
     datasource3 = Filter.apply(frame = datasource2, f = lambda x: x["input_file_name"] == args["input_file_path"]) 

    Let’s print out some metadata to the console for debugging purposes as well:

    print "input_file_path from AWS Lambda:" , args["input_file_path"]
    print "Filtered records count: ", datasource3.count()
    
  6. We can start to work with the filtered dataframe as we need in the Glue job now. You should consider to schedule some maintenance job or data retention policy on the file arrival bucket.

To guarantee that each file gets processed only once and never again ( that’s in case it would get dropped to the source bucket multiple times ) I would enhance the Lambda function with a logging write / lookup mechanism handling the filename ( or file content hash) in a DynamoDB logger table.

Advertisements