Tagging AWS S3 objects in a file processing pipeline

This is just a quick tip how to keep your AWS file processing pipelines tied together in your logging and monitoring platform under one consistent trace. This is critical for investigation purposes. Also this is a real life scenario used often within AWS data processing operations pipelines. In this case, AWS Lambda A is a file generator ( a relational database data extraction tool ), Lambda B is processing additional file validation logic before this file gets send out. Boto3 calls in the Lambda functions are used to put and get the S3 object tags.

Diagram bez názvu

  1. Lambda function A generates a version 4 uuid used for the trace_id, starts logging under the trace_id and generates a csv file in a S3 bucket
  2. Lambda function A tags the csv file with a key “trace_id” and it’s value being the uuid
  3. Lambda function B gets the csv file
  4. Lambda function B reads the csv file tag with the trace_id and continues processing this file further while continuously logging under the same trace_id

 

I have one sidenote here: Step #3 and #4 could have had swapped order, depends on your use case though. Getting the tag of the object first will leave a minimal gap in the trace events, but might be more complex on the coding side of things.

Advertisements

AWS Glue job in a S3 event-driven scenario

I am working with PySpark under the hood of the AWS Glue service quite often recently and I spent some time trying to make such a Glue job s3-file-arrival-event-driven. I succeeded, the Glue job gets triggered on file arrival and I can guarantee that only the file that arrived gets processed, however the solution is not very straightforward. So this is the 10000 ft overview:

event_driven_glue (1) (1)

 

  1. File gets dropped to a s3 bucket “folder”, which is also set as a Glue table source in the Glue Data Catalog
  2. AWS Lambda gets triggered on this file arrival event, this lambda is doing this boto3 call besides some s3 key parsing, logging etc.
    def lambda_handler(event, context):
        ...
        ...
        ...
        # parsedjobname = .. parsed out from the "folder" name in the s3 file arrival event 
        # fullpath = .. parsed from the key in the s3 file arrival event
        try:
            glue_client.start_job_run(JobName=parsedjobname, Arguments={'--input_file_path': full_path})
            return 0
        except ClientError as e:
            logging.error("terminating - , %s", str(e))
            return 1
  3. The glue job corresponding to the “folder” name in the file arrival event gets triggered with this Job parameter set:glue_param
  4. The glue job loads into a Glue dynamic frame the content of the files from the AWS Glue data catalog like:
     datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "your_glue_db", table_name = "your_table_on_top_of_s3", transformation_ctx = "datasource0") 

    It also appends the filename to the dynamic frame, like this:

     from pyspark.sql.functions import input_file_name
    datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name()) 

    and at last, it converts the dynamic frame back to a dataframe like this:

     datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2") 
  5. In this step, we filter the dataframe to process further only the rows from the file related to the S3 file arrival event.
     datasource3 = Filter.apply(frame = datasource2, f = lambda x: x["input_file_name"] == args["input_file_path"]) 

    Let’s print out some metadata to the console for debugging purposes as well:

    print "input_file_path from AWS Lambda:" , args["input_file_path"]
    print "Filtered records count: ", datasource3.count()
    
  6. We can start to work with the filtered dataframe as we need in the Glue job now. You should consider to schedule some maintenance job or data retention policy on the file arrival bucket.

To guarantee that each file gets processed only once and never again ( that’s in case it would get dropped to the source bucket multiple times ) I would enhance the Lambda function with a logging write / lookup mechanism handling the filename ( or file content hash) in a DynamoDB logger table.

Spinning up AWS locally using Localstack

Recently I came across this github project called Localstack. It allows you to spin up a local AWS environment as a service or as a Docker container. You can utilize such a tool in your integration testing in your CI/CD pipelines while not paying a cent for the used AWS services or also for all kinds of “hacking AWS” efforts. I’m pretty sure there is many more usage scenarios. Today I’d like to show you how this awesome stack works.

For this step by step tutorial, I will work in my Ubuntu environment and utilize Pipenv,  so make sure to check that out if you haven’t already.

Now let’s get our hands dirty, let’s clone the Localstack Git repo.

git clone https://www.github.com/localstack/localstack localstack_playground

Let’s CD into the folder containing the codebase

cd localstack_playground

and now let’s install into Pipenv the localstack tool with all the dependencies (npm) and related packages (awscli-local) needed:

pipenv --three
pipenv install npm
pipenv install localstack
pipenv install awscli-local

Lets start the pipenv env shell

pipenv shell

Lets start the localstack

localstack start

Now the service is running:

locastack_started

Let’s open a new terminal window and we can start to hit the mocked up AWS services running now locally. We’ll create a s3 bucket called tutorial, list my buckets , change the access control list for this s3 bucket, upload a file we create and then remove this bucket and list all my buckets again to see that everything worked and the teardown cleanup phase successfully passed. For these s3 calls, we’ll use the awslocal cli wrapper around localstack, but you can proceed using Boto3 as well.

awslocal s3 mb s3://tutorial
awslocal s3 ls
echo Hello World! >> helloworld.txt
awslocal s3api put-bucket-acl --bucket tutorial --acl public-read
awslocal s3 cp helloworld.txt s3://tutorial

Let’s see the s3 objects in the browser using this URL ( the port of the mocked up s3 service is 4572 as you can see in the screenshot above:

try this url:  http://localhost:4572/tutorial/

locastack_s3_ls

try this url:  http://localhost:4572/tutorial/helloworld.txt

locastack_s3_obj

Now we shall remove the object, the bucket and list my buckets to see there is no bucket left

awslocal s3 rm s3://tutorial/helloworld.txt
awslocal s3 rb s3://tutorial
awslocal s3 ls

Now it’s clear you can easily work with AWS services like S3. Other services on this list work great as well, for instance let’s try to create a SNS topic and publish a message into it.

awslocal sns create-topic --name datahappy_topic
# you get back the topic arn id
awslocal sns publish --topic-arn "arn:aws:sns:us-east-1:123456789012:datahappy-topic" --message "datahappy about local mocked up sns"

Enjoy!

API connection “retry logic with a cooldown period” simulator ( Python exercise )

This is a very simple API call “circuit-breaker” style simulator I’ve written in Python. But since it’s a stateless code snippet, you should call it most likely a “retry logic with a cooldown period” simulator. But there are valid use case scenarios when stateless is the desired state type. For example when a validation of a dataset against a service can either pass or fail and throw an exception and cause the execution flow to halt. This is typical for data-flow styled apps. In such case the circuit-open state is not acceptable. Anyway the goal is to make sure, that whenever we have a connection (or timeout) error during the API call ,we retry after 10 seconds, after 20 seconds, after 30 seconds and then quit trying. The ConnectionError exception is simulated using a non-routable address 10.255.255.1.

However in the microservices world, if you want to implement the full-scale state-full circuit-breaker, please have a look at this article.

import datetime
import time
import logging
import requests

iterator = 1
attempt = 1

# while cycle to simulate connection error using non-routable IP address
while iterator < 40:
    try:
        # if iterator inside the range to simulate success
        if 1 < iterator < 8:
            r = requests.get('https://data.police.uk/api/crimes-at-location?date=2017-02&location_id=884227')
        # else iterator outside the range to simulate the error event
        else:
            r = requests.get('http://10.255.255.1')
        if r.status_code != requests.codes.ok:
            logging.error('Wrong request status code received, %s', r.status_code)
        r = r.json()
        print(r)
        attempt = 1
    except (requests.exceptions.ConnectionError, requests.exceptions.Timeout,
            requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout) as conn_err:
        print(f'bingo, ConnectionError, now lets wait {attempt * 10} seconds before retrying', (datetime.datetime.now()))
        time.sleep(attempt * 10)
        attempt = attempt + 1
        if attempt > 3:
            logging.error('Circuit-breaker forced exit')
            raise conn_err
    iterator = iterator + 1

Also try avoiding Python time.sleep() method in AWS Lambdas as this would not be cost efficient, AWS Step Functions would be much more appropriate.

*Updated March 14th 2019: Considering the cost analysis in this article, it might be actually ok to have time.sleep() in the Lambda, depends on your use case though.

https://blog.scottlogic.com/2018/06/19/step-functions.html

A few thoughts on AWS Batch with S3 event-driven usage scenarios

AWS Batch is a great service. This is what AWS says about it: AWS Batch enables developers, scientists, and engineers to easily and efficiently run hundreds of thousands of batch computing jobs on AWS. AWS Batch dynamically provisions the optimal quantity and type of compute resources (e.g., CPU or memory optimized instances) based on the volume and specific resource requirements of the batch jobs submitted. With AWS Batch, there is no need to install and manage batch computing software or server clusters that you use to run your jobs, allowing you to focus on analyzing results and solving problems. AWS Batch plans, schedules, and executes your batch computing workloads across the full range of AWS compute services and features, such as Amazon EC2 and Spot Instances.

What I want to write about in this blogpost is how to make the AWS Batch service work for you in a real-life S3 file arrival event-driven scenario. I use this approach for decoupling the metadata of the file that arrived to spin up a Batch data-processing job where the metadata from the file arrival event define the application logic and the  validations that are processed in the Batch job and when all succeeds, then the Batch job picks up the file itself for processing.

Let’s look at the 2 possible options I ‘ve worked with so far below:

s3_event_driven_batch

Scenario #1 : A file arrives to a s3 bucket, CloudTrail logs capture the event and raise it to CloudWatch service, and this triggers AWS Batch job as it is a valid CloudWatch target. Use this scenario in case you don’t need to involve heavy logic in the arguments you pass to your Batch job. Typically you would use just basic metadata like the s3 key, s3 “file path” etc.

*Note: Don’t forget to have your CloudTrail log files repository in another bucket then the bucket you use for the file arrival event, otherwise the CloudTrail log files can easily keep triggering the Batch job 🙂

Scenario #2: A file arrives to a s3 bucket, Lambda function has this event set as an input, and this Lambda function triggers a AWS Batch job using the standard BOTO3 API library. Use this scenario when you need more logic before triggering the Batch job. Typically you might want to split the s3 file “file path”, or use the file size etc. and add some additional conditional logic for the arguments you provide to the Batch job.

Both of these solutions have some serious downside though. Solution #1 is weak in the way, that you are not able to add more complex conditional logic for the Batch job arguments. Solution #2 is weak in the way, that AWS Lambda Function has a 15 minute timeout , but the Batch job can run much longer, and therefore you never hear back from the Batch job execution in the context of the Lambda Function. So you’d have to have another Lambda function acting as a Batch job status poller. Ofcourse, you can follow up watching over the Batch job in CloudWatch logs or in the AWS Batch Dashboard, but in this case, you might want to try out the AWS Step functions. They allow you to add orchestration to your Lambda functions firing the Batch jobs. You can see more about AWS Step functions running Lambdas firing Batch jobs here .

Spinning up a Docker container with Flask and Python

Imagine you need to replicate an existing web API returning a JSON ( listing all feeds in some system ) on your local machine for further development purposes and possible extensions. Today I’ll demonstrate how to achieve this using Docker container , Python and Flask. Note that this tutorial requires some previous experience with Python and Docker. Have a look at Flask, it’s a powerful and easy to use Python web framework.

The source web API we’ll be replicating is returning a valid JSON structure listing all the feeds:

[{
  "feed_name": "feed1",
  "feed_type": "feed type 1",
  "filemasks": [
    "filemask11",
    "filemask12"
  ]
},
{
  "feed_name": "feed2",
  "feed_type": "feed type 2",
  "filemasks": [
    "filemask21",
    "filemask22"
  ]
}]

Let’s save this dummy JSON file as feeds.json on our local file system.

Next we’ll setup the environment and start with Docker:

mkdir docker-api

mkdir docker-api/app

mkdir docker-api/feeds

cd docker-api


#1) create Dockerfile as:

FROM python:3.6-stretch

COPY . .

RUN pip install -r requirements.txt

WORKDIR /app/

ENTRYPOINT ["python3"]

CMD ["app.py"]


#2) create requirements.txt as :

Flask==0.10.1


#3) download the feeds.json file from the website
to your local filesystem into docker-api/feeds/

 

Let’s move forward with the Python application, which is reading from the Docker image

the feeds.json file and exposing this JSON to the Flask web API. We won’t be stepping into

any actions like GET or PUT, just returning the complete JSON file listing all the feeds.

 

I prepared the Python app in the location docker-api/app/app.py and it is looking like this:


import os
from flask import Flask
from flask import Response

app = Flask(__name__)

@app.route('/api/v1/feeds')
def returner():
    os.chdir("..")
    path = os.path.abspath(os.curdir) + '/feeds/feeds.json'

    with open(path,"r") as f:
        data = f.read()
        resp = Response(response=data, status=200, mimetype="application/json")
        return resp

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0')

The next step is spinning up the docker container ( once we build the image of course )

cd docker-api/

docker build -t feeds . 

docker run -d -p 5000:5000 feeds 

docker container list 

*optionally docker container kill(or stop) container_id in
case you need to "restart" the container
Btw. docker kill vs docker stop is an interesting topic and is discussed for example here
Let’s confirm that your Python project structure is looking like this:
docker

and voila, after running docker run -d -p 5000:5000 feeds , if you lookup the webpage

localhost:5000/api/v1/feeds

in your web browser, you should be getting the response with the desired JSON listing all the feeds.

You might want to check-out also curl.

Fibonacci sequence ( Python exercise )

Let’s continue with the simple Python exercises I’ve been messing around lately. This is a classical question at Dev job interviews, the Fibonacci sequence code. The idea behind this is to come up with code, that sums up the previous 2 member values for a member in the sequence, simply expressed like 1,2,3,5,8,13…

Below are my personal takes on this problem.

1: The nice and performing solution

#get fibonacci
import sys

def main(arg):
    seq_len = arg
    seq_len_iterator = 2
    var1 = 1
    var2 = 2
    fibonacci = ([var1, var2])

    while seq_len_iterator < seq_len:

        var3 = var1 + var2
        fibonacci.append(var3)

        i = len(fibonacci)
        var1 = fibonacci[i-2]
        var2 = fibonacci[i-1]

        seq_len_iterator = seq_len_iterator + 1

    print(f'Fibonacci sequence for {seq_len} sequence members goes like: {fibonacci}')

if __name__ == '__main__':
    try:
        arg = int(sys.argv[1])
        main(arg)
    except:
        print(f'Invalid input, must be integer!')

Execute with the needed sequence member count argument like for instance :

python.exe C:/codility/fibonacci/__main__.py 10

 

2: The alternative “nested-iterations” solution ( Not performing over ~30 sequence members count, durations exponentially grow, however its another example of a valid solution and can be useful if you need to warm oneself during long winter cold nights somewhere outside 🙂 )

#get fibonacci
import sys


def main(arg):
    seq_len = arg
    seq_len_iterator = 2
    iterator = 1
    var1 = 1
    var2 = 2
    fibonacci = ([var1, var2])

    while seq_len_iterator < seq_len:
        if iterator == var1 + var2:
            fibonacci.append(iterator)
            var1 = var2
            var2 = iterator
            iterator = iterator + 1
            seq_len_iterator = seq_len_iterator + 1
            #print(f'Fibonacci member found in try #: {iterator}')
        else:
            iterator = iterator + 1

    print(f'Fibonacci sequence for {seq_len} sequence members goes like: {fibonacci}')


if __name__ == '__main__':
    try:
        arg = int(sys.argv[1])
        main(arg)
    except:
        print(f'Invalid input, must be integer!')

Execute with the needed sequence member count argument like for instance :

python.exe C:/codility/fibonacci/__main__.py 10

Binary gap length ( Python exercise )

Sometimes I like to mess around http://www.codility.com , doing the excercises trying to keeping my development skills fresh. This is my take on the binary gap length problem using basic Python 3. The binary gap length is an excercise where you need to come up with a code, returning the longest sequence of zeros in a 16 digit “binary” string. This question also often shows up at developer job interviews.

#get max binary zeros gap
import re
import sys


def get_binary_gap(input_seq):
    iterator_zeros = '0'
    output = None
    while len(iterator_zeros) < 16:
        if iterator_zeros in input_seq:
            stack = len(iterator_zeros)
            if stack >= len(iterator_zeros):
                output = stack
        elif iterator_zeros not in input_seq and len(iterator_zeros) == 1:
            output = 0
        else:
            pass
        iterator_zeros = iterator_zeros + '0'
    return output


def main(arg):
    input_seq = arg
    if len(input_seq) == 16 and bool(re.match("^[0-1]{1,16}$", input_seq)):
        output = get_binary_gap(input_seq)
        print(f'The max binary gap of zeros in sequence {input_seq} is {output}')
    else:
        print(f'invalid input sequence {input_seq}')


if __name__ == '__main__':
    arg = str(sys.argv[1])
    main(arg)

Execute with the binary sequence argument like for instance :

python.exe C:/codility/binary_gap/__main__.py 0100000101010100