A few thoughts on AWS Batch S3 event-driven usage

AWS Batch is a great service. This is what AWS says about it: AWS Batch enables developers, scientists, and engineers to easily and efficiently run hundreds of thousands of batch computing jobs on AWS. AWS Batch dynamically provisions the optimal quantity and type of compute resources (e.g., CPU or memory optimized instances) based on the volume and specific resource requirements of the batch jobs submitted. With AWS Batch, there is no need to install and manage batch computing software or server clusters that you use to run your jobs, allowing you to focus on analyzing results and solving problems. AWS Batch plans, schedules, and executes your batch computing workloads across the full range of AWS compute services and features, such as Amazon EC2 and Spot Instances.

What I want to write about in this blogpost is how to make the AWS Batch service work for you in a real-life S3 file arrival event-driven scenario. I use this approach for decoupling the metadata of the file that arrived to spin up a Batch data-processing job where the metadata from the file arrival event define the application logic and the  validations that are processed in the Batch job and when all succeeds, then the Batch job picks up the file itself for processing.

Let’s look at the 2 possible options I ‘ve worked with so far below:

s3_event_driven_batch

Scenario #1 : A file arrives to a s3 bucket, CloudTrail logs capture the event and raise it to CloudWatch service, and this triggers AWS Batch job as it is a valid CloudWatch target. Use this scenario in case you don’t need to involve heavy logic in the arguments you pass to your Batch job. Typically you would use just basic metadata like the s3 key, s3 “file path” etc.

*Note: Don’t forget to have your CloudTrail log files repository in another bucket then the bucket you use for the file arrival event, otherwise the CloudTrail log files can easily keep triggering the Batch job 🙂

Scenario #2: A file arrives to a s3 bucket, Lambda function has this event set as a input, and this Lambda function triggers a AWS Batch job using the standard BOTO3 API library. Use this scenario when you need more logic before triggering the Batch job. Typically you might want to split the s3 file “file path”, or use the file size etc. and add some additional conditional logic for the arguments you provide to the Batch job.

Both of these solutions have some serious downside though. Solution #1 is weak in the way, that you are not able to add more complex conditional logic for the Batch job arguments. Solution #2 is weak in the way, that AWS Lambda Function has a 5 minute timeout , but the Batch job can run much longer, and therefore you never hear back from the Batch job execution in the context of the Lambda Function. Ofcourse you can follow up watching over the Batch job in CloudWatch logs or in the AWS Batch Dashboard, but in this case, you might want to try out the AWS Step functions. They allow you to add orchestration to your Lambda functions firing the Batch jobs. You can see more about AWS Step functions running Lambdas firing Batch jobs here .

Advertisements

Spinning up a Docker container with Flask and Python

Imagine you need to replicate an existing web API returning a JSON ( listing all feeds in some system ) on your local machine for further development purposes and possible extensions. Today I’ll demonstrate how to achieve this using Docker container , Python and Flask. Note that this tutorial requires some previous experience with Python and Docker. Have a look at Flask, it’s a powerful and easy to use Python web framework.

The source web API we’ll be replicating is returning a valid JSON structure listing all the feeds:

[{
  "feed_name": "feed1",
  "feed_type": "feed type 1",
  "filemasks": [
    "filemask11",
    "filemask12"
  ]
},
{
  "feed_name": "feed2",
  "feed_type": "feed type 2",
  "filemasks": [
    "filemask21",
    "filemask22"
  ]
}]

Let’s save this dummy JSON file as feeds.json on our local file system.

Next we’ll setup the environment and start with Docker:

mkdir docker-api

mkdir docker-api/app

mkdir docker-api/feeds

cd docker-api


#1) create Dockerfile:

FROM python:3.6-stretch

COPY . .

RUN pip install -r requirements.txt

WORKDIR /app/

ENTRYPOINT ["python3"]

CMD ["app.py"]


#2) create requirements.txt:

Flask==0.10.1


#3) download the feeds.json file from the website
to your local filesystem into docker-api/feeds/

 

Let’s move forward with the Python application, which is reading from the Docker image

the feeds.json file and exposing this JSON to the Flask web API. We won’t be stepping into

any actions like GET or PUT, just returning the complete JSON file listing all the feeds.

 

I prepared the Python app in the location docker-api/app/app.py and it is looking like this:

import os
from flask import Flask
from flask import Response

app = Flask(__name__)
@app.route('/api/v1/feeds')

def returner():
    os.chdir("..")
    path = os.path.abspath(os.curdir) + '/feeds/feeds.json'

    with open(path,"r") as f:
        data = f.read()
        resp = Response(response=data,
                        status=200,
                        mimetype="application/json")
        return(resp)

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0')

The next step is spinning up the docker container ( once we build the image of course )

cd docker-api/ 

docker build -t feeds . 

docker run -d -p 5000:5000 feeds 

docker container list 

*optionally docker container kill(or stop) container_id 
incase you need to "restart" the container
Btw. docker kill vs docker stop is an interesting topic and is discussed for example here
Let’s confirm that your Python project structure is looking like this:
docker

and voila, after running docker run -d -p 5000:5000 feeds , if you lookup the webpage

localhost:5000/api/v1/feeds

in your web browser, you should be getting the response with the desired JSON listing all the feeds.

You might want to check-out also curl.

Fibonacci sequence ( Python exercise )

Let’s continue with the simple Python exercises I’ve been messing around lately. This is a classical question at Dev job interviews, the Fibonacci sequence code. The idea behind this is to come up with code, that sums up the previous 2 member values for a member in the sequence, simply expressed like 1,2,3,5,8,13…

Below are my personal takes on this problem.

1: The nice and performing solution

#get fibonacci
import sys

def main(arg):
    seq_len = arg
    seq_len_iterator = 2
    var1 = 1
    var2 = 2
    fibonacci = ([var1, var2])

    while seq_len_iterator < seq_len:

        var3 = var1 + var2
        fibonacci.append(var3)

        i = len(fibonacci)
        var1 = fibonacci[i-2]
        var2 = fibonacci[i-1]

        seq_len_iterator = seq_len_iterator + 1

    print(f'Fibonacci sequence for {seq_len} sequence members goes like: {fibonacci}')

if __name__ == '__main__':
    try:
        arg = int(sys.argv[1])
        main(arg)
    except:
        print(f'Invalid input, must be integer!')

Execute with the needed sequence member count argument like for instance :

python.exe C:/codility/fibonacci/__main__.py 10

 

2: The alternative “nested-iterations” solution ( Not performing over ~30 sequence members count, durations exponentially grow, however its another example of a valid solution and can be useful if you need to warm oneself during long winter cold nights somewhere outside 🙂 )

#get fibonacci
import sys


def main(arg):
    seq_len = arg
    seq_len_iterator = 2
    iterator = 1
    var1 = 1
    var2 = 2
    fibonacci = ([var1, var2])

    while seq_len_iterator < seq_len:
        if iterator == var1 + var2:
            fibonacci.append(iterator)
            var1 = var2
            var2 = iterator
            iterator = iterator + 1
            seq_len_iterator = seq_len_iterator + 1
            #print(f'Fibonacci member found in try #: {iterator}')
        else:
            iterator = iterator + 1

    print(f'Fibonacci sequence for {seq_len} sequence members goes like: {fibonacci}')


if __name__ == '__main__':
    try:
        arg = int(sys.argv[1])
        main(arg)
    except:
        print(f'Invalid input, must be integer!')

Execute with the needed sequence member count argument like for instance :

python.exe C:/codility/fibonacci/__main__.py 10

Binary gap length ( Python exercise )

Sometimes I like to mess around http://www.codility.com , doing the excercises trying to keeping my development skills fresh. This is my take on the binary gap length problem using basic Python 3. The binary gap length is an excercise where you need to come up with a code, returning the longest sequence of zeros in a 16 digit “binary” string. This question also often shows up at developer job interviews.

#get max binary zeros gap
import re
import sys


def get_binary_gap(input_seq):
    iterator = 0
    iterator_zeros = '0'

    while iterator < 16:
        if iterator_zeros in input_seq:
            stack = len(iterator_zeros)
            if stack > iterator:
                output = stack

        elif iterator_zeros not in input_seq and len(iterator_zeros) == 1:
            output = 0

        else:
            pass

        iterator_zeros = iterator_zeros + '0'
        iterator = iterator + 1
    return output


def main(arg):
    input_seq = arg

    if len(input_seq) == 16 and bool(re.match("^[0-1]{1,16}$", input_seq)):
        output = get_binary_gap(input_seq)
        print(f'The max binary gap of zeros in sequence {input_seq} is {output}')

    else:
        print(f'invalid input sequence {input_seq}')


if __name__ == '__main__':
    arg = str(sys.argv[1])
    main(arg)

Execute with the binary sequence argument like for instance :

python.exe C:/codility/binary_gap/__main__.py 0100000101010100

Materials for my presentation “Get your ETL flow under statistical process control” presented at Bulgaria Web Summit 2018

  • the presentation slides: BWS018
  • the detailed word doc: BWS2018 talk
  • DummyData you need to insert into the dbo.Logger_History table to start
  • and of course the SQL Server database objects scripted out below:
/* ==Scripting Parameters==

Source Server Version : SQL Server 2017 (14.0.1000)
 Source Database Engine Edition : Microsoft SQL Server Enterprise Edition
 Source Database Engine Type : Standalone SQL Server

Target Server Version : SQL Server 2017
 Target Database Engine Edition : Microsoft SQL Server Enterprise Edition
 Target Database Engine Type : Standalone SQL Server
*/

USE [master]
GO

CREATE DATABASE [Logger]
GO

USE [Logger]
GO
/****** Object: Table [dbo].[Logger_History] Script Date: 4/13/2018 5:32:06 PM ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE TABLE [dbo].[Logger_History](
 [Logger_History_ID] [int] IDENTITY(1,1) NOT NULL,
 [Execution_UTC_Datetime] [datetime] NULL,
 [Job_Name] [nvarchar](255) NULL,
 [Job_Status] [char](1) NULL,
 [Job_Last_Modified_By] [nvarchar](255) NULL,
 [Source_File_Type] [nvarchar](255) NULL,
 [Source_File_Name] [nvarchar](255) NULL,
 [Customer_Name] [nvarchar](255) NULL,
 [Data_Feed_Type] [nvarchar](255) NULL,
 [Server_Name] [nvarchar](255) NULL,
 [Stage_Name] [nvarchar](255) NULL,
 [Stage_Row_Count] [int] NULL,
 [Processing_Start_Datetime] [datetime] NULL,
 [Processing_End_Datetime] [datetime] NULL,
 [Processing_Duration] [int] NULL,
 [Moved_To_ES] [bit] NULL,
 [IsProcessed] [bit] NULL,
PRIMARY KEY CLUSTERED 
(
 [Logger_History_ID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
ALTER TABLE [dbo].[Logger_History] ADD DEFAULT ((0)) FOR [Moved_To_ES]
GO
ALTER TABLE [dbo].[Logger_History] ADD DEFAULT ((0)) FOR [IsProcessed]
GO
/****** Object: StoredProcedure [dbo].[gen_3sigma_event] Script Date: 4/13/2018 5:32:06 PM ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

CREATE PROC [dbo].[gen_3sigma_event] AS
BEGIN

SET NOCOUNT ON;

BEGIN TRY
BEGIN TRANSACTION

--DECLARATIONS
 DECLARE @ID int, @gen_stmt nvarchar(2000);

SELECT
 Customer_Name,Data_Feed_Type,Stage_Name,
 AVG([Stage_Row_Count]) AS AVG_Stage_Row_Count,
 SQRT(VAR([Stage_Row_Count])) AS SDEV_Stage_Row_Count,
 AVG([Processing_Duration]) AS AVG_Processing_Duration,
 SQRT(VAR([Processing_Duration])) AS SDEV_Processing_Duration
 INTO #tmp_aggregations
 FROM [dbo].[Logger_History]
 GROUP BY Customer_Name,Data_Feed_Type,Stage_Name;

--CALCULATE SCP OUTLIERS
 SELECT
 i.[Logger_History_ID],i.[Execution_UTC_Datetime],
 i.[Sigma_Status_Stage_Row_Count],i.[Sigma_Status_Processing_Duration],
 i.[Stage_Row_Count],i.[Processing_Duration],
 i.[-3Sigma_Stage_Row_Count],i.[+3Sigma_Stage_Row_Count],
 i.[-3Sigma_Processing_Duration],i.[+3Sigma_Processing_Duration],
 i.Server_Name,i.Data_Feed_Type,i.Customer_Name,i.Stage_Name,i.Job_Name,i.Job_Status,
 i.Source_File_Type,i.Source_File_Name,i.[Processing_Start_Datetime],i.[Processing_End_Datetime]
 INTO #tmp_outliers
 FROM (
 SELECT 
 --Row_Count Outlier calculation:
 CASE 
 WHEN [Stage_Row_Count] >= ta.AVG_Stage_Row_Count-ta.SDEV_Stage_Row_Count and Stage_Row_Count <= ta.AVG_Stage_Row_Count + SDEV_Stage_Row_Count
 THEN 'Fits 1 Sigma'
 WHEN [Stage_Row_Count] >= ta.AVG_Stage_Row_Count-(2*SDEV_Stage_Row_Count) and Stage_Row_Count <= ta.AVG_Stage_Row_Count + (2*SDEV_Stage_Row_Count)
 THEN 'Fits 2 Sigma'
 WHEN [Stage_Row_Count] >= ta.AVG_Stage_Row_Count-(3*SDEV_Stage_Row_Count) and Stage_Row_Count <= ta.AVG_Stage_Row_Count + (3*SDEV_Stage_Row_Count)
 THEN 'Fits 3 Sigma'
 ELSE 'SCP Outlier'
 END AS Sigma_Status_Stage_Row_Count
 --,ta.[AVG_Stage_Row_Count] as MEAN,ta.SDEV_Stage_Row_Count as SDEV,ta.AVG_Stage_Row_Count-ta.SDEV_Stage_Row_Count AS [-1Sigma],ta.AVG_Stage_Row_Count+ta.SDEV_Stage_Row_Count AS [+1Sigma],ta.AVG_Stage_Row_Count-(2*ta.SDEV_Stage_Row_Count) AS [-2Sigma],ta.AVG_Stage_Row_Count+(2*ta.SDEV_Stage_Row_Count) AS [+2Sigma]
 ,ta.AVG_Stage_Row_Count - (3*SDEV_Stage_Row_Count) AS [-3Sigma_Stage_Row_Count]
 ,ta.AVG_Stage_Row_Count + (3*SDEV_Stage_Row_Count) AS [+3Sigma_Stage_Row_Count]

--Processing_Duration Outlier calculation:
 ,CASE 
 WHEN [Processing_Duration] >= ta.AVG_Processing_Duration-ta.SDEV_Processing_Duration and Processing_Duration <= ta.AVG_Processing_Duration + SDEV_Processing_Duration
 THEN 'Fits 1 Sigma'
 WHEN [Processing_Duration] >= ta.AVG_Processing_Duration-(2*SDEV_Processing_Duration) and Processing_Duration <= ta.AVG_Processing_Duration + (2*SDEV_Processing_Duration)
 THEN 'Fits 2 Sigma'
 WHEN [Processing_Duration] >= ta.AVG_Processing_Duration-(3*SDEV_Processing_Duration) and Processing_Duration <= ta.AVG_Processing_Duration + (3*SDEV_Processing_Duration)
 THEN 'Fits 3 Sigma'
 ELSE 'SCP Outlier'
 END AS Sigma_Status_Processing_Duration
 --,ta.[AVG_Stage_Row_Count] as MEAN,ta.SDEV_Stage_Row_Count as SDEV,ta.AVG_Stage_Row_Count-ta.SDEV_Stage_Row_Count AS [-1Sigma],ta.AVG_Stage_Row_Count+ta.SDEV_Stage_Row_Count AS [+1Sigma],ta.AVG_Stage_Row_Count-(2*ta.SDEV_Stage_Row_Count) AS [-2Sigma],ta.AVG_Stage_Row_Count+(2*ta.SDEV_Stage_Row_Count) AS [+2Sigma]
 ,ta.AVG_Processing_Duration - (3*SDEV_Processing_Duration) AS [-3Sigma_Processing_Duration]
 ,ta.AVG_Processing_Duration + (3*SDEV_Processing_Duration) AS [+3Sigma_Processing_Duration]

,lm.*
 FROM [dbo].[Logger_History] lm
 INNER JOIN #tmp_aggregations ta 
 ON lm.Customer_Name = ta.Customer_Name
 AND lm.Data_Feed_Type = ta.Data_Feed_Type 
 AND lm.Stage_Name = ta.Stage_Name
 WHERE 
 IsProcessed = 0 AND
 Moved_To_ES = 0
 ) i
 WHERE i.Sigma_Status_Stage_Row_Count = 'SCP Outlier' 
 OR i.Sigma_Status_Processing_Duration = 'SCP Outlier' ;
 
 UPDATE [dbo].[Logger_History]
 SET [IsProcessed] = 1
 WHERE [IsProcessed] = 0;

--LOG SCP OUTLIERS TO WINDOWS EVENT LOG EVENTS
 WHILE EXISTS (SELECT * FROM #tmp_outliers)
 BEGIN
 
 SELECT TOP(1)
 @ID = [Logger_History_ID]
 FROM #tmp_outliers
 ORDER BY [Logger_History_ID];

SELECT @gen_stmt = 
 'EXEC xp_logevent 60000,'+
 '''MESSAGE: !This is a SCP 3 Sigma Outlier based on '+
 CASE 
 WHEN #tmp_outliers.[Sigma_Status_Stage_Row_Count] = 'SCP Outlier' 
 THEN 'Stage Row Count'
 WHEN #tmp_outliers.[Sigma_Status_Processing_Duration] = 'SCP Outlier'
 THEN 'Processing Duration'
 WHEN #tmp_outliers.[Sigma_Status_Stage_Row_Count] = 'SCP Outlier'
 AND #tmp_outliers.[Sigma_Status_Processing_Duration] = 'SCP Outlier'
 THEN 'Stage Row Count AND Processing Duration'
 END +';'+CHAR(10)+
 'Customer: '+#tmp_outliers.[Customer_Name]+' ;'+CHAR(10)+
 'Job Name: '+#tmp_outliers.[Job_Name]+' ;'+CHAR(10)+
 'Job Status: '+#tmp_outliers.[Job_Status]+' ;'+CHAR(10)+
 'Execution UTC Datetime: '+CAST(#tmp_outliers.[Execution_UTC_Datetime] AS VARCHAR(20))+' ;'+CHAR(10)+
 'Source File Type: '+#tmp_outliers.[Source_File_Type]+' ;'+CHAR(10)+
 'Source File Name: '+#tmp_outliers.[Source_File_Name]+' ;'+CHAR(10)+
 'Data Feed Type: '+#tmp_outliers.[Data_Feed_Type]+' ;'+CHAR(10)+
 'Server Name: '+#tmp_outliers.[Server_Name]+' ;'+CHAR(10)+
 'Stage Name: '+#tmp_outliers.[Stage_Name]+' ;'+CHAR(10)+
 'Stage_Row_Count: '+CAST(#tmp_outliers.[Stage_Row_Count] AS VARCHAR(10))+' ;'+CHAR(10)+
 'Processing_Duration: '+CAST(#tmp_outliers.[Processing_Duration] AS VARCHAR(10))+' ;'+CHAR(10)+
 ''', informational;'
 FROM #tmp_outliers
 WHERE @ID = [Logger_History_ID];

EXEC (@gen_stmt);
 --PRINT @gen_stmt;

UPDATE [dbo].[Logger_History]
 SET [Moved_To_ES] = 1
 WHERE @ID = [Logger_History_ID];

DELETE FROM #tmp_outliers WHERE [Logger_History_ID] = @ID;

END

COMMIT TRANSACTION
END TRY

BEGIN CATCH

IF @@TRANCOUNT > 0
 ROLLBACK TRANSACTION;
 
 DECLARE @ErrorNumber INT = ERROR_NUMBER();
 DECLARE @ErrorLine INT = ERROR_LINE();
 DECLARE @ErrorMessage NVARCHAR(4000) = ERROR_MESSAGE();
 DECLARE @ErrorSeverity INT = ERROR_SEVERITY();
 DECLARE @ErrorState INT = ERROR_STATE();
 
 PRINT 'Actual error number: ' + CAST(@ErrorNumber AS VARCHAR(10));
 PRINT 'Actual line number: ' + CAST(@ErrorLine AS VARCHAR(10));
 
 RAISERROR(@ErrorMessage, @ErrorSeverity, @ErrorState);

END CATCH

END
GO