csv file validation using Python

I figured out I need a csv file validation tool in my toolbelt. I wasn’t really impressed with the options out there, someone saying to use MS Excel for this, someone recommending free online services ( some of these services like https://csvlint.io/validation are actually storing links to download the validated files and that’s a no-go for me ), I also found some decent tools on Github, but in the end I’ve written my own csv file validation framework. The source code is located here in my Github.

When it comes to the implementation, I typically think along these lines: Is it a complex problem I’m dealing with or not, and the answer to this question should be aligned with the complexity of the implementation. I believe that a validation framework of csv files is a simple problem. There is only one way how a csv file can be valid, there should be no potential side-effects and surprises. Therefore the implementation is really simple. BTW there is only one 3rd party dependency besides pytest, and that is the python-dateutil library, which seems really solid when it comes down to validation of date/time/datetime columns. What I focused on more in this project is the test coverage, it is 79%, and thats seems like a pretty decent test coverage value to me.

Happy coding!

Redshift server-side cursor with Psycopg2 Python adapter for better query performance

When it comes to extracting data out of AWS Redshift using a Python client, you’d typically end up using Psycopg2 Postgres adapter because it’s the most common one. Today I’d like to share some of my thoughts on how to boost up the performance when querying large datasets. The main topic is going to be AWS Redshift server-side vs. client-side cursors. However when it comes to querying large data sets from Redshift, you have as far as I know atleast 3 options.

#1: Option number one is to unload the queried data directly to a file in AWS S3. See the press for this option here. That is the best option when it comes to performance, but might not be usable every time ( for instance when you need to post-process the data in the extracted files, this option also does some implicit file splitting per node slice which might be undesired behavior etc.)

#2: Option number two is the second best performing alternative, and that is to use a server-side Redshift cursor. This happens when you make use of the argument “name” while initializing a Postgres cursor, like I do below :

self.cur = self.conn.cursor(name=self.cursor_name)

I found server-side cursors to have a huge performance improvement impact for datasets >1M and all the way up to 10M rows ( this is the max amount of rows I’ve been testing with, but can be much more ) , however be careful, as at some point AWS is recommending to not use them for very large datasets … Because cursors materialize the entire result set on the leader node before beginning to return results to the client, using cursors with very large result sets can have a negative impact on performance.

It is needed to point out, that server-side cursors can be created only when the query is a plain SELECT or a SELECTs with CTEs. Basically usage of DDL and DML statements in your queries blocks creating a server side cursor, so it’s handled in the example below with declaration of a list of the blocking statements and checking the query does not contain any of these.

#3: Option number three is using a plain client-side cursor, basically the difference in implementation is, that you do not make any use of the name argument while initializing the Postgres cursor, see:

 self.cur = self.conn.cursor()

The performance is typically OK-ish for datasets having thousands or tens of thousands of rows.

For these 2 cursor types, I’d highly recommend to fetch by many rows using a Python generator yielding the rows, instead of the cursor fetchall method. Fetchall worked fine for me for small datasets ( tested on ~ 10 columns, ~30k rows ) but in general I don’t recommend it. I’ve seen the fetchall method combined with the client-side cursor raise Memory errors on a 10 column 300k rows dataset in AWS Batch while having 2GB memory set in it’s job definition.

Don’t forget to set the fetch_row_size parameter to your needs, 1000 is probably a small size and will result in many remote DB roundtrips. Size of 10000 would be a good starting point in my opinion.

Here’s an quick example tied to a public Postgres database (which should be good enough for illustration ) connection I prepared for sandboxing purposes so you can see how it’s possible to utilize server-side cursor in your codebase:

import psycopg2


class CustomException(Exception):
    pass


class CNX:
    def __init__(self):
        self.conn = None
        self.fetch_row_size = 1000
        # here I set the query limit for the performance 
        # comparison chart below
        self.query = "SELECT * from rna LIMIT 1000;"
        self.cursor_name = 'my_cursor'

    def init_connection(self):
        """
        Initiate a DB connection
        :return: connection conn object
        """
        try:
            self.conn = psycopg2.connect(database="pfmegrnargs", user="reader",
                                         password="NWDMCE5xdipIjRrp",
                                         host="hh-pgsql-public.ebi.ac.uk", port="5432")

        except psycopg2.Error as postgres_exc:
            raise CustomException("Psycopg2 connect exception: {}".format(postgres_exc))

        return

    def connect_to_data_source(self):
        with self.conn:
            # AWS Redshift Psycopg2 data source DOES support named
            # server-side cursors and setting its itersize for
            # single-select queries and select queries with CTEs
            _stmts_blocking_named_server_side_cursor = ["CREATE", "INSERT", "DELETE", "UPDATE",
                                                        "DROP", "TRUNCATE", "CALL"]

            if not any(x in self.query.upper() for x in _stmts_blocking_named_server_side_cursor):
                print('Named server side cursor used for query execution')
                self.cur = self.conn.cursor(name=self.cursor_name)
                self.cur.itersize = self.fetch_row_size

            # AWS Redshift Psycopg2 data source DOES NOT support
            # server-side cursors for queries using temp tables to store temporary results, DDL etc.
            else:
                print('Simple cursor used for query execution')
                self.cur = self.conn.cursor()

            return

    def execute(self):
        """
        execute the query
        :param query:
        :return:
        """
        try:
            self.cur.execute(self.query)

        except psycopg2.Error as postgres_exc:
            raise CustomException("Psycopg2 execute query exception: {}".format(postgres_exc))

        return

    def fetch_from_db(self):
        print("fetching rows START")
        while True:
            results = self.cur.fetchmany(size=self.fetch_row_size)
            if not results:
                print("fetching rows END")
                break
            print("fetching rows...")
            yield results


# usage of fetched rows example:
cnx_obj = CNX()
cnx_obj.init_connection()
cnx_obj.connect_to_data_source()
cnx_obj.execute()

for batch_rows in cnx_obj.fetch_from_db():
    print(batch_rows)

 

Let’s evaluate the query performance results by comparing the duration of execution runtime of the code snippet I shared above. I think the chart is pretty self-explanatory in this case, server-side cursor performance wins massively over client-side as the amount of fetched rows returned from the query increases.

ss-cs

 

Flask and Chart.js reports with drilldown

I really enjoy the integration between Flask web application backend, Bootstrap for styling and Chart.js library for data visualisation on the browser “frontend”. However I was missing a way how to drill down through a data point on the chart into a detail report page. So I decided to give back to the community, do some research and wrap up the solution myself.

see the demo web application “animation”:

screens_gif

You can clone the project from here: https://github.com/datahappy1/flask_chartjs_drilldown_example_project

Enjoy!

Heroku vs. Pythonanywhere and their free-tier quick comparison Flask web app hosting

I’m finally getting to deploy my data-science web application for Czech language sentiment analysis to some free Python web hosting. This is just for the initial dev and QA rounds of work before I migrate it over to some paid environment. The two biggest players in this field seem to be Pythonanywhere and Heroku. Let me quickly loop through my thoughts on my experience with both of these environments, that I had so far.

Let’s start by looking at the resources you get in your free tier account. Considering I need to build up a virtual environment with Sklearn package import (dragging along NumPy, SciPy and others) , I definitely need more than 500MB drive space Pythonanywhere is offering me. Heroku wins this round, they provide you with a hard limit of 1GB Git repo max size turning into a “slug” size limit of 500MB (after compression), which seems to be sufficient for my needs. There is a caveat to this though. Pythonanywhere is providing you with 500 MB of resilient drive space, whereas Heroku provides you with ephemeral storage, which means every 24 hours your app “dyno” is drained and brought back up in a process called cycling, but this means you cannot persist your data throughout this phase. Thinking about this limitation, it’s probably OK, because we want to be developing modern state-less apps with persistence layers build for scaling-out anyway, right? So in the end, this limitation made me realize I’ll need to remove the Sqlite local storage and forced me to start thinking of coming up with more robust and scalable data persistence solution in the final version.

Let’s continue with the deployment experience. Pythonanywhere needs you to run an embedded bash console from within their web interface, and you need to run Git commands manually to get the latest codebase version from your Github repository. Heroku, after you setup the Github integration with your repository, allows you to instantly run deployment pipelines. I must admit, this is impresive, and you really setup a simple CD pipeline in a few minutes. This round has got to go to Heroku as well. 

When it comes to debugging, I feel Pythonanywhere gives you full control and easier access to the application error logs. You can loop through the log files for application access logs, error logs and server logs. You can also go through the logs archives. Heroku has an CLI access to the logs, but the logs are merged together for the whole application and in overall this feels to me like a selling point for Pythonanywhere.

Wrapping this up, both of these work really well. I’m also still learning to work with both. If I written something that I understood wrong, please feel free to comment below. Pythonanywhere gives you much more control, but also there’s much more overhead in configuring your app. My personal preference is less configuring, and more time spent on coding, so in the end, I ended up with Heroku, but I can see the selling points for both of these environments. In a nutshell, I’m really glad both of these are available out there for free giving you so much support in the early stages of your application development.

 

 

 

Czech sentiment analyzer project aka “czester”

Hey! Just finished my latest project, the Czech language sentiment analyzer. The official name of this tool is “CZESTER”. Spend a lot of my time with this project the last couple of months, but I feel it was definitely worth it.

I learned a lot of new facts about multi-threaded web scraping, supervised machine learning, natural language processing, Flask and Chart.js integration which I used for site statistics page and also about deploying Flask machine learning web applications to Pythonanywhere and Heroku. This was probably the coolest project I worked on in a while.

It’s currently deployed to czester.herokuapp.com and I’d love to hear what you think about it. The details describing this project are located here: czester.herokuapp.com/methodology

Converting csv to parquet files

Recently a colleague of mine asked me how to convert csv files to parquet columnar files format without all that overhead like locally spinning up Spark etc.

Considering I like to play around with Pandas, my answer was … Pandas to the action!

And in this post I’m sharing the result, a super simple csv to parquet and vice versa file converter written in Python. It can work with files on your local machine, but also allows you to save / load files using an AWS S3 bucket. Ability to build tools like this during a few lunch breaks makes me understand, why Python is so popular in the data engineering world these days.

https://github.com/datahappy1/csv_to_parquet_converter

In the future version of this tool, I plan to have a look at https://github.com/modin-project/modin  to speed up Pandas with parallel Dataframes, but so far, I had no luck on a Windows machine.

Flask, Server-sent events, JavaScript example project == “done”

I just finished a super fun example project with a Flask web back-end, HTML5 Server-sent events and a bit of JavaScript.

Check it out here: https://github.com/datahappy1/flask_sse_example_project

So, what’s it all about? Flask, Waitress and Javascript are used for 1-N Server-sent event streams to enable long running jobs state auto-refresh visualized in a HTML table in the browser. You could use this to make the AWS EMR or Batch consoles auto-refresh for instance, but basically any long-running task state can be “auto-refreshed” in this way. 

Here’s a high level diagram of the project architecture:

diagram

And a .gif “screenshot” from the web app running at your localhost:

screens

More details are in the README.MD in the Github repository.

AWS + Python Boto3 testing

As of November 2019, I’m currently aware of at least 3 decent options how to unit test your Python app functions/methods used as wrappers of boto3 calls handling the interaction with AWS. Unit testing your functions with boto3 calls, using the methods I’m about to mention, has it’s pros and it’s cons:

pros:

  • You don’t need to spend money for testing ( If testing done correctly )
  • You can test the vulnerabilities of your codebase
  • You can get meaningful test coverage of your codebase
  • You can “add_client_error” to the Botocore Stubber response for negative testing
  • Moto and Localstack can be used as standalone “mockup’d” AWS servers and therefore used with any programming language allowing SDK interaction with AWS ( Java, C#, Ruby, Python, JavaScript, PHP, and Objective C (iOS) )

cons:

  • With Moto and Stubbers, if testing codebase written incorrectly, you can be hitting the real AWS services endpoints, and even worse, you can mess up your data in your production account
  • With Moto and Stubbers, I encourage you to do testing of singular action functions/methods, which might not always exactly fit your codebase ( This can however be seen actually as a pro )
  • With Localstack, there’s some overhead while spinning it up as a Docker based service
  • Typically not all AWS services fully covered as of 11/2019

 

Moto

Github repo link

Moto is a great project and you can see the service coverage grow on a regular basis. Currently supported services are listed here . Besides other cool features , Moto is also able to operate in a stand alone server mode.

Let’s see what Moto’s Github readme say on how you could end up testing simple S3 put object function:

import boto3
from moto import mock_s3

class MyModel(object):
    def __init__(self, name, value):
        self.name = name
        self.value = value

    def save(self):
        s3 = boto3.client('s3', region_name='us-east-1')
        s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)

@mock_s3
def test_s3_put_object():
    conn = boto3.resource('s3', region_name='us-east-1')
    # We need to create the bucket since this is all in Moto's 'virtual' AWS account
    conn.create_bucket(Bucket='mybucket')

    model_instance = MyModel('steve', 'is awesome')
    model_instance.save()

    body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")

    assert body == 'is awesome'

 

Botocore stubbers

Github repo link

Botocore, the foundation behind Boto3, the official AWS SDK for Python, has a class Stubber in the stub.py module, allowing you to stub out requests instead of hitting the real AWS endpoints.

Testing S3 put object method would look for instance like this:

import botocore.session
from botocore.stub import Stubber

def function_we_are_testing_putting_object_to_s3(client):
    ret = client.put_object(Bucket="mybucket", Key="testkey")
    return ret


def test_s3_put_object():
    client = botocore.session.get_session().create_client('s3')
    stubber = Stubber(client)
    put_object_response = {
            'Expiration': 'string',
            'ETag': 'abcd',
            'ServerSideEncryption': 'AES256',
            'VersionId': 'string',
            'SSECustomerAlgorithm': 'string',
            'SSECustomerKeyMD5': 'string',
            'SSEKMSKeyId': 'string',
            'SSEKMSEncryptionContext': 'string',
            'RequestCharged': 'requester'
        }

    expected_params = {'Bucket':'mybucket', 'Key':'testkey'}
    stubber.add_response('put_object', put_object_response, expected_params)

    with stubber:
        response = function_we_are_testing_putting_object_to_s3(client)
    assert response == put_object_response

Notice that when we try to get the response with the stubber context, we don’t need even any AWS credentials set on our machine. If you try to get the response for the assertion outside of the stubber context, you’ll get AWS missing credentials error. This way you can verify that AWS endpoint was not targeted at all.

Localstack

Github repo link

I did some research on Localstack at the beginning of 2019, see my tutorial here . What you’d need would be a function with a Boto3 connection set to your Localstack S3 like:

http://localhost:4572

So a S3 put object example could be somewhere along these lines:

import boto3

session = boto3.session.Session()

def test_s3_put_object():
    s3_client = session.client(
        service_name='s3',
        aws_access_key_id='aaa',
        aws_secret_access_key='bbb',
        endpoint_url='http://localhost:4572',
    )
    expected_put_object_status_code = 200
    s3_client_put_object_resp = s3_client.put_object(Bucket='mybucket', Key="testFile", Body="testContent")
    s3_client_put_object_resp_status_code = s3_client_put_object_resp.get('ResponseMetadata').get('HTTPStatusCode')

    #and now you can for instance assert the boto3 put_object response status_code is the status_code you expect

 

One last thing here, if your AWS S3 testing requires a dummy test file, I recently released my dummy_file_generator project into PyPI. See here.

 

Dummy File Generator released to PyPI

dfg_logo

I recently spent some time extending the “dummy file generator” Python project I wrote last year, to be not only a command line tool, but also an importable package you can import into your projects, integrate with your codebase and generate dummy text files during your code runtime. See here: Pypi project homepage

The most common usage scenario can be load / stress / performance testing of file-processing data tools, where you can now generate dummy text files during the test fixtures / setup.

My next thoughts regarding this project is to utilize the CI / CD provided by Github Actions to automate the PyPI release process.

Flask, MVC, Github integration example project finished

I just finished a fun and challenging project in Python. It’s a project based on Flask web microframework allowing the user to work with branches and files of their Github repository. This functionality is accessible via a locally running web application but the same functionality is also covered with rest api endpoints. These api endpoints are also used as endpoints for tests.

purpose of this Flask project is to demonstrate:

  1. how to deliver the same features in both api endpoints and static html forms in one Flask project
  2. how Flask can be used with the MVC design pattern with the controller imported as Flask blueprints
  3. how Flask can deal with Github integration using the Github API v3 (used as the model “persistence” layer in the MVC design pattern)
  4. api endpoint pytest testing using the app factory import pattern
  5. ability of such a web application to gracefully fail on Github integration exceptions

Have a look here: https://github.com/datahappy1/flask_mvc_github_example_project

The basic idea behind this example project is this:

diagram

few screenshots from the app: