Pytest with Hypothesis test framework

Few weeks ago, I written this is post on Missing integer Python exercise. This time, we’re going to give the function I written for that exercise some seriously hard time using the Hypothesis test framework. Hypothesis is a Python library for creating unit tests which are simpler to write and more powerful when run, finding edge cases in your code you wouldn’t have thought to look for. Here is the official documentation.

Basically, all you need to do, to get setup here, is to make sure you’ve got the Pytest package installed, and of course install Hypothesis using Pip.

pip3 install pytest

pip3 install hypothesis

The function calculating the missing integer will be stored in a file called main.py and will look like this:

def function_a(input_list):
    if not isinstance(input_list, list) or len(input_list) <= 1:
        return 1
    input_list.append(0)
    sorted_input_list = sorted(input_list)
    sorted_input_list_iterator = iter(sorted_input_list)
    next(sorted_input_list_iterator)

    for item in sorted_input_list:
        item_plus_1 = item + 1
        try:
            next_item = next(sorted_input_list_iterator)
            if item <= 0 and next_item > 1:
                return 1
            if next_item not in (item, item_plus_1) and next_item > 0:
                return item_plus_1
        except StopIteration:
            return item_plus_1

The Pytest test runner file for testing the function_a function will be in this project’s subfolder called tests/ , will be named test_with_hypothesis.py and will look like this:

from hypothesis import given, reject
from hypothesis.strategies import integers, lists

from main import function_a


@given(lists(integers()))
def test_some_stuff(input):
    try:
        result = function_a(input)
    except Exception:
        reject()

    assert result
    assert isinstance(result, int)

This is how the sample project would look like in Pycharm:

Before we run our Pytest tests runner, let’s make sure we get to see all the input combinations from Hypothesis that were used simply by adding print(input, result) statement to the test function. So we modify the function to look like this:

@given(lists(integers()))
def test_some_stuff(input):
    try:
        result = function_a(input)
    except Exception:
        reject()
    print(input, result)
    assert result
    assert isinstance(result, int)

And once we run Pytest, for every input generated by Hypothesis, the 2 assertion statements are executed. These are the various test inputs Hypothesis generated in this test run:

PASSED [100%][] 1
[10] 1
[15294, 531, 0] 1
[0] 1
[26695, -22054, 0] 1
[0] 1
[0] 1
[0] 1
[0] 1
[0] 1
[] 1
[116, 106269151377797082934974390172316169794, 31424, -31, 50, 0] 1
[-4863, -1667, 0] 1
[-16157, 94, -23611, -1, 78, -1014034003, 11426, -6491, 0] 1
[-16157, -23611, -23611, -1, 78, -1014034003, 11426, -6491, 0] 1
[738, -14781, 69, 22090, 1, 29916, -49250859699495267074789931210936954004, -73, -2532, 99, 26325, 65, 11363, 3521813628522614813, -32587, 44, -43, 0] 2
[6650071126684864406, -18406, 1342450230, 82, 63971691232398135368522975465193238990, 0] 1
[-21690, -112, 28400, 0] 1
[1332272385158754537, -19666, 79, 42, 27240, -4035472154411183264, 0] 1
[-3007, -17202, -106, 5948, 10666, 0] 1
[-3007, -17202, -106, 5948, 10666, 0] 1
[86, -66, -1137478758, -41, -23100, 0] 1
[-9379, 27594, 13088, 0] 1
[-9379, 27594, 13088, 0] 1
[13088, 27594, 13088, 0] 1
[-31209, -123, 357237431, -15139, 0] 1
[-31209, -123, 357237431, -15139, 0] 1
[-61, 2310027603164367674, -17914, 105, 111, -12391, -15144, 1731392719, -3872260813340282230, 0] 1
[-25836, 75655317988143660925603206215277718983, -9609, 750173150, 31681, 98, 0] 1
[256, -58851512, 26308, 0] 1
[26825] 1
[26825, 26825, 26850, 0] 1
[26825, 26850, 26850, 0] 1
[26825, 26850, 26850, 0] 1
[15803, -29698, -29505, -7, -39, 822791089, 69, -20596, -1658, -93, 27573, 0] 1
[-72, -82, 30093, 0] 1
[-72] 1
[20506, -40, 0] 1
[-16964, 24527, 22686, 88, 80, 9291, -71, 23320, -10751, 0] 1
[84800811309668880829811978997916166776, -72, -1540788812, 0] 1
[-63] 1
[-2246, 52, -87, -259, 884110608, -33, 0] 1
[-28] 1
[-23969, 1594091873301085806, 0] 1
[531261904, -22115, 25045, 21939, -283, -71823575265068420995799035873944096879, 428551993317954714, -20680, -14239, 16344, -9261, 1097744801, 4489, 20184, 55, 26, -9, -2561702789346947892, 1795642808, 27611, 935014051, 26094, -28, 785159553338897973, -23540, 19652, -6444212787438133303, -4815, 162979844072243057210062703177497391349, -97, 0] 1
[13215] 1
[13215, 13215, -30735, 5437043399508957559, -665758433, 0] 1
[13215, -665758433, -30735, 5437043399508957559, -665758433, 0] 1
[-665758433, -665758433, -30735, 5437043399508957559, -665758433, 0] 1
[1352133560, 85, -15290, 14067, -27071, 5442723013430277027, -127, 17773, 7368170948219587529, -14731, 89, 64, 50022797994927627053737428777938319704, 62, -28497, 0] 1
[0] 1
[-8359914706234088707] 1
[-8359914706234088707, -8359914706234088707, -26, 0] 1
[-8359914706234088707, -8359914706234088707, -1909620066562526127, -8005, 101, 4753, -24324, 0] 1
[-8359914706234088707, -1909620066562526127, -1909620066562526127, -8005, 101, 4753, -24324, 0] 1
[-8359914706234088707, -5038, 73447561143034809623942017177250973719, 18, -21385, -29883, 5327, 22, 1571, -18811, -47, -12420, -3198671763051887895, 0] 1
[-8359914706234088707, 22, 73447561143034809623942017177250973719, 18, -21385, -29883, 5327, 22, 1571, -18811, -47, -12420, -3198671763051887895, 0] 1
[-5926811867032019664, -17444, -5814788238586876888, 0] 1
[-5926811867032019664, 2, 0] 1
[21056, 8389120, 0] 1
[21056, 8389120, 0] 1
[-5926737672302264448] 1
[-5926737672302264448, -5926737672302264448, 0] 1
[-25825, 1902, -29, -2790, 0] 1
[1902, 1902, -29, -2790, 0] 1
[-24659, 28894, -352105788, -94, 2520187554308692200, 16122, -69, -22, -4609402082944272128, 0] 1
[-24659, 28894, -352105788, -94, 2520187554308692200, 16122, -69, -22, -4609402082944272128, 0] 1
[23226, 463278862, -28060, 99, -84, -23181, -5910842997000785862, 23925, 109, 32532, 30878, -31876, 0] 1
[23226, 463278862, -28060, 99, -84, -23181, -5910842997000785862, 23925, 109, 32532, 30878, -31876, 0] 1
[10845, -72, -128891632, -23, 0] 1
[10845, 1, 0] 2
[10845, 1, 58, 0] 2
[10845, 58, 58, 0] 1
[256, -1, -4652328488159891767070554708652156414, 0] 1
[256] 1
[31253, -6588299761822078497, 104, -119, 835, 11823, -1885221123, 5993, 15292, 20335, 19257, 0] 1
[452313367, 4734, 21998, 1265607759790469680, -3446, -25231, -782650966, -14685, 0] 1
[21010, 31459, 19172, 86, 0] 1
[210, -122, -74, 0] 1
[-1190, -3388, 0] 1
[-3388, -3388, 0] 1
[-3388, 13, 31962, 99, -25147, 31928, -1836987774, 24439, 0] 1
[-3388, -3328, 0] 1
[512, -3, -100, -90, 0] 1
[512, -100, -100, -90, 0] 1
[-2842741521326321115391965608576135341] 1
[-2842741521326321115391965608576135341, -12637, -7013452532555885562, -97, 31511, 0] 1
[-32538, -7214, -6662, 1072, 0] 1
[-119389789869000676161737041481982141754, -55, -17743, -110, 90, -18157, -33, -5421, 0] 1
[6184606076545796580, 3787879747147418944, -79, 1105, 0] 1
[6184606076545796580, 3787879747147418944, -79, 1105, 0] 1
[3787879747147418944, 3787879747147418944, -79, 1105, 0] 1
[74, 3524868262113989158, -27605, 23, -60738829120480306099612123430163860640, -6022, 1143426831, -52, 2, -1075777159704476303, 13943, 30488, 32055, -23503, 0] 1
[-22368, 67498080928536154904002613360106515583, 0] 1
[67498080928536154904002613360106515583, 67498080928536154904002613360106515583, 0] 1
[67498080928536154904002613360106515583] 1
[23400] 1
[-623804689, 20689, 0] 1
[-8552579] 1
[-230274] 1

AWS SQS message polling to Elastic DSL query filter automation

While being the engineer on call, automation can help make your life much easier. Today I’d like to share an automation script I wrote , that saves a lot of clicking and helps the team move faster while investigating an issue. The way our distributed system is setup, is that whenever a message fails to be successfully processed, it ends up in a AWS SQS dead letter queue. That gives the on call engineer enough time to see what went wrong and react. Here is a introduction to AWS SQS dead letter queues.

This script allows the user to poll for all messages in this queue, and returns an Elastic DSL filter JSON, which you can use in Kibana search to instantly see all the logs for all the messages from the dead letter queue at once. It saves you time and clicking in the sense, that you no longer need to check message by message in your monitoring platform for the logs. You need to define a key, that is both in the message body, and in the Elasticsearch index.

Here is the Github repository: https://github.com/datahappy1/sqs_to_kibana_query_filter_helper

Codility Missing Integer ( Python excercise )

Let’s take a look at one Codility challenge called the missing integer excercise. The problem is to find the minimal positive integer not occurring in a given sequence. You have to write an efficient algorithm for the following assumptions: given an array A of N integers, return the smallest positive integer (greater than 0) that does not occur in A.

  • N is an integer within the range [1..100,000]
  • each element of the array A is an integer within the range [−1,000,000..1,000,000].

Examples
For example, given A= [1, 3, 6, 4, 1, 2], the function should return 5.
Given A= [1, 2, 3], the function should return 4.
Given A= [−1, −3], the function should return 1.

You should also achieve this using the best possible time-space complexity. This is a great primer for time-space complexity.

And below is my solution of the problem written in Python. The solution can be further improved by making it run in constant space.

def function_a(input_list):
    if not isinstance(input_list, list) or len(input_list) <= 1 :
        return 1
    input_list.append(0)
    sorted_input_list = sorted(input_list)
    sorted_input_list_iterator = iter(sorted_input_list)
    next(sorted_input_list_iterator)

    for item in sorted_input_list:
        item_plus_1 = item + 1
        try:
            next_item = next(sorted_input_list_iterator)
            if item <= 0 and next_item > 1:
                return 1
            if next_item not in (item, item_plus_1) and next_item > 0:
                return item_plus_1
        except StopIteration:
            return item_plus_1

A = [1, 3, 6, 4, 1, 2] # sample input list

print(f'output: {function_a(A)}')

csv file validation using Python

I figured out I need a csv file validation tool in my toolbelt. I wasn’t really impressed with the options out there, someone saying to use MS Excel for this, someone recommending free online services ( some of these services like https://csvlint.io/validation are actually storing links to download the validated files and that’s a no-go for me ), I also found some decent tools on Github, but in the end I’ve written my own csv file validation framework. The source code is located here in my Github.

When it comes to the implementation, I typically think along these lines: Is it a complex problem I’m dealing with or not, and the answer to this question should be aligned with the complexity of the implementation. I believe that a validation framework of csv files is a simple problem. There is only one way how a csv file can be valid, there should be no potential side-effects and surprises. Therefore the implementation is really simple. BTW there is only one 3rd party dependency besides pytest, and that is the python-dateutil library, which seems really solid when it comes down to validation of date/time/datetime columns. What I focused on more in this project is the test coverage, it is 79%, and thats seems like a pretty decent test coverage value to me.

Happy coding!

Redshift server-side cursor with Psycopg2 Python adapter for better query performance

When it comes to extracting data out of AWS Redshift using a Python client, you’d typically end up using Psycopg2 Postgres adapter because it’s the most common one. Today I’d like to share some of my thoughts on how to boost up the performance when querying large datasets. The main topic is going to be AWS Redshift server-side vs. client-side cursors. However when it comes to querying large data sets from Redshift, you have as far as I know atleast 3 options.

#1: Option number one is to unload the queried data directly to a file in AWS S3. See the press for this option here. That is the best option when it comes to performance, but might not be usable every time ( for instance when you need to post-process the data in the extracted files, this option also does some implicit file splitting per node slice which might be undesired behavior etc.)

#2: Option number two is the second best performing alternative, and that is to use a server-side Redshift cursor. This happens when you make use of the argument “name” while initializing a Postgres cursor, like I do below :

self.cur = self.conn.cursor(name=self.cursor_name)

I found server-side cursors to have a huge performance improvement impact for datasets >1M and all the way up to 10M rows ( this is the max amount of rows I’ve been testing with, but can be much more ) , however be careful, as at some point AWS is recommending to not use them for very large datasets … Because cursors materialize the entire result set on the leader node before beginning to return results to the client, using cursors with very large result sets can have a negative impact on performance.

It is needed to point out, that server-side cursors can be created only when the query is a plain SELECT or a SELECTs with CTEs. Basically usage of DDL and DML statements in your queries blocks creating a server side cursor, so it’s handled in the example below with declaration of a list of the blocking statements and checking the query does not contain any of these.

#3: Option number three is using a plain client-side cursor, basically the difference in implementation is, that you do not make any use of the name argument while initializing the Postgres cursor, see:

 self.cur = self.conn.cursor()

The performance is typically OK-ish for datasets having thousands or tens of thousands of rows.

For these 2 cursor types, I’d highly recommend to fetch by many rows using a Python generator yielding the rows, instead of the cursor fetchall method. Fetchall worked fine for me for small datasets ( tested on ~ 10 columns, ~30k rows ) but in general I don’t recommend it. I’ve seen the fetchall method combined with the client-side cursor raise Memory errors on a 10 column 300k rows dataset in AWS Batch while having 2GB memory set in it’s job definition.

Don’t forget to set the fetch_row_size parameter to your needs, 1000 is probably a small size and will result in many remote DB roundtrips. Size of 10000 would be a good starting point in my opinion.

Here’s an quick example tied to a public Postgres database (which should be good enough for illustration ) connection I prepared for sandboxing purposes so you can see how it’s possible to utilize server-side cursor in your codebase:

import psycopg2


class CustomException(Exception):
    pass


class CNX:
    def __init__(self):
        self.conn = None
        self.fetch_row_size = 1000
        # here I set the query limit for the performance 
        # comparison chart below
        self.query = "SELECT * from rna LIMIT 1000;"
        self.cursor_name = 'my_cursor'

    def init_connection(self):
        """
        Initiate a DB connection
        :return: connection conn object
        """
        try:
            self.conn = psycopg2.connect(database="pfmegrnargs", user="reader",
                                         password="NWDMCE5xdipIjRrp",
                                         host="hh-pgsql-public.ebi.ac.uk", port="5432")

        except psycopg2.Error as postgres_exc:
            raise CustomException("Psycopg2 connect exception: {}".format(postgres_exc))

        return

    def connect_to_data_source(self):
        with self.conn:
            # AWS Redshift Psycopg2 data source DOES support named
            # server-side cursors and setting its itersize for
            # single-select queries and select queries with CTEs
            _stmts_blocking_named_server_side_cursor = ["CREATE", "INSERT", "DELETE", "UPDATE",
                                                        "DROP", "TRUNCATE", "CALL"]

            if not any(x in self.query.upper() for x in _stmts_blocking_named_server_side_cursor):
                print('Named server side cursor used for query execution')
                self.cur = self.conn.cursor(name=self.cursor_name)
                self.cur.itersize = self.fetch_row_size

            # AWS Redshift Psycopg2 data source DOES NOT support
            # server-side cursors for queries using temp tables to store temporary results, DDL etc.
            else:
                print('Simple cursor used for query execution')
                self.cur = self.conn.cursor()

            return

    def execute(self):
        """
        execute the query
        :param query:
        :return:
        """
        try:
            self.cur.execute(self.query)

        except psycopg2.Error as postgres_exc:
            raise CustomException("Psycopg2 execute query exception: {}".format(postgres_exc))

        return

    def fetch_from_db(self):
        print("fetching rows START")
        while True:
            results = self.cur.fetchmany(size=self.fetch_row_size)
            if not results:
                print("fetching rows END")
                break
            print("fetching rows...")
            yield results


# usage of fetched rows example:
cnx_obj = CNX()
cnx_obj.init_connection()
cnx_obj.connect_to_data_source()
cnx_obj.execute()

for batch_rows in cnx_obj.fetch_from_db():
    print(batch_rows)

 

Let’s evaluate the query performance results by comparing the duration of execution runtime of the code snippet I shared above. I think the chart is pretty self-explanatory in this case, server-side cursor performance wins massively over client-side as the amount of fetched rows returned from the query increases.

ss-cs

 

Flask and Chart.js reports with drilldown

I really enjoy the integration between Flask web application backend, Bootstrap for styling and Chart.js library for data visualisation on the browser “frontend”. However I was missing a way how to drill down through a data point on the chart into a detail report page. So I decided to give back to the community, do some research and wrap up the solution myself.

see the demo web application “animation”:

screens_gif

You can clone the project from here: https://github.com/datahappy1/flask_chartjs_drilldown_example_project

Enjoy!

Heroku vs. Pythonanywhere and the free-tier comparison for Flask web app hosting

I’m finally getting to deploy my data-science web application for Czech language sentiment analysis to some free Python web hosting. This is just for the initial dev and QA rounds of work before I migrate it over to some paid environment. The two biggest players in this field seem to be Pythonanywhere and Heroku. Let me quickly loop through my thoughts on my experience with both of these environments, that I had so far.

Let’s start by looking at the resources you get in your free tier account. Considering I need to build up a virtual environment with Sklearn package import (dragging along NumPy, SciPy and others) , I definitely need more than 500MB drive space Pythonanywhere is offering me. Heroku wins this round, they provide you with a hard limit of 1GB Git repo max size turning into a “slug” size limit of 500MB (after compression), which seems to be sufficient for my needs. There is a caveat to this though. Pythonanywhere is providing you with 500 MB of resilient drive space, whereas Heroku provides you with ephemeral storage, which means every 24 hours your app “dyno” is drained and brought back up in a process called cycling, but this means you cannot persist your data throughout this phase. Thinking about this limitation, it’s probably OK, because we want to be developing modern state-less apps with persistence layers build for scaling-out anyway, right? So in the end, this limitation made me realize I’ll need to remove the Sqlite local storage and forced me to start thinking of coming up with more robust and scalable data persistence solution in the final version.

Let’s continue with the deployment experience. Pythonanywhere needs you to run an embedded bash console from within their web interface, and you need to run Git commands manually to get the latest codebase version from your Github repository. Heroku, after you setup the Github integration with your repository, allows you to instantly run deployment pipelines. I must admit, this is impresive, and you really setup a simple CD pipeline in a few minutes. This round has got to go to Heroku as well. 

When it comes to debugging, I feel Pythonanywhere gives you full control and easier access to the application error logs. You can loop through the log files for application access logs, error logs and server logs. You can also go through the logs archives. Heroku has an CLI access to the logs, but the logs are merged together for the whole application and in overall this feels to me like a selling point for Pythonanywhere.

Wrapping this up, both of these work really well. I’m also still learning to work with both. If I written something that I understood wrong, please feel free to comment below. Pythonanywhere gives you much more control, but also there’s much more overhead in configuring your app. My personal preference is less configuring, and more time spent on coding, so in the end, I ended up with Heroku, but I can see the selling points for both of these environments. In a nutshell, I’m really glad both of these are available out there for free giving you so much support in the early stages of your application development.

 

 

 

Czech sentiment analyzer project aka “czester”

Hey! Just finished my latest project, the Czech language sentiment analyzer. The official name of this tool is “CZESTER”. Spend a lot of my time with this project the last couple of months, but I feel it was definitely worth it.

I learned a lot of new facts about multi-threaded web scraping, supervised machine learning, natural language processing, Flask and Chart.js integration which I used for site statistics page and also about deploying Flask machine learning web applications to Pythonanywhere and Heroku. This was probably the coolest project I worked on in a while.

It’s currently deployed to czester.herokuapp.com and I’d love to hear what you think about it. The details describing this project are located here: czester.herokuapp.com/methodology

Converting csv to parquet files

Recently a colleague of mine asked me how to convert csv files to parquet columnar files format without all that overhead like locally spinning up Spark etc.

Considering I like to play around with Pandas, my answer was … Pandas to the action!

And in this post I’m sharing the result, a super simple csv to parquet and vice versa file converter written in Python. It can work with files on your local machine, but also allows you to save / load files using an AWS S3 bucket. Ability to build tools like this during a few lunch breaks makes me understand, why Python is so popular in the data engineering world these days.

https://github.com/datahappy1/csv_to_parquet_converter

In the future version of this tool, I plan to have a look at https://github.com/modin-project/modin  to speed up Pandas with parallel Dataframes, but so far, I had no luck on a Windows machine.

Flask, Server-sent events, JavaScript example project == “done”

I just finished a super fun example project with a Flask web back-end, HTML5 Server-sent events and a bit of JavaScript.

Check it out here: https://github.com/datahappy1/flask_sse_example_project

So, what’s it all about? Flask, Waitress and Javascript are used for 1-N Server-sent event streams to enable long running jobs state auto-refresh visualized in a HTML table in the browser. You could use this to make the AWS EMR or Batch consoles auto-refresh for instance, but basically any long-running task state can be “auto-refreshed” in this way. 

Here’s a high level diagram of the project architecture:

diagram

And a .gif “screenshot” from the web app running at your localhost:

screens

More details are in the README.MD in the Github repository.