Saturday, 12 February 2022

This week 2/2022 - Testing PySpark application

In this post I'd like to present my remarks referring to writing python tests. I have just started writing tests in Python for Apache Spark application. Before that I have done that in Java or Scala so I was mostly focused on testing in Python using dependency injection.

This time I brake TDD rule and I wrote a simple application at first which download some data from external resource and store it in parquet file on hdfs and then I wrote tests.

 


To build this application I used DI framework [1] which helps me to decompose this application on exchangeable elements and avoid monkey patching.

I created a simple matcher for DataFrame like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class DataFrameWithSchema(object):
    """
    A helper object that compares Spark DataFrame schema,
    schema parameter contains list of column names and column types, ex. ['col_name: type']
    """
    def __init__(self, schema: list, any_order: bool = True):
        self.schema = schema
        self.any_order = any_order

    def __eq__(self, other: DataFrame):

        other_schema = [dtype[0] + ': ' + dtype[1] for dtype in other.dtypes]
        if self.any_order:
            return set(self.schema) == set(other_schema)
        return self.schema == other_schema

    def __ne__(self, other: DataFrame):
        return not self.__eq__(other)

    def __repr__(self):
        return 'DataFrame[' + ', '.join(self.schema) + ']'

It compares types and names of columns in DataFrame.

Then I prepared a dependency injection context using pytest fixture:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@pytest.fixture(scope='module', autouse=True)
def app_context():
    """
    Setup function which create DI context and loads config from yaml file
    """
    application = Application()
    application.config.from_yaml("../config.yml", required=True)
    application.core.init_resources()
    application.wire(modules=[sys.modules[__name__]])
    yield application

In this function I load a config file, wire dependences and return them to test context.

Then I wrote a test.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def test_load_data(app_context):
    """
    Test function which takes a DI context prepared by fixture function.
    """

    # given
    def load_data(data_type):
        """
        Loads data from test directory depending on data type
        """
        if data_type == '1':
            return Data(data_type, open("data1.csv", "rb"))
        return Data(data_type, open("data2.csv", "rb"))

    # Mocked downloader class with prepared function response.
    file_downloader_mock = MagicMock(spec=FileDownloader, load_file=MagicMock(side_effect=load_data))
    # Replacement of implementation for container
    app_context.repositories.file_downloader.override(file_downloader_mock)

    parquet_file_data_frame_repository_mock = MagicMock(spec=HDFSFilePublisher)
    app_context.repositories.parquet_file_data_frame_repository.override(parquet_file_data_frame_repository_mock)

    data_date = '2020-12-31'

    # when
    loader_runner.main(data_date)

    # then
    # Verification of method call. What is important, 'file_downloader_mock.load_file' must be a mock
    # in our case it is method with side effect.
    file_downloader_mock.load_file.assert_has_calls([call('1', f'https://example.com/{data_date}', ANY)],
                                                    any_order=True)

    parquet_file_data_frame_repository_mock.publish.assert_has_calls(
        [call(DataFrameWithSchema(['name: string', 'data_date: date']), '/tmpfile')],
        any_order=True)

Having Java background and being a beginner in Python testing I created code as above.

In test code there is nothing about creating Spark context but it is created in production code by DI provider when it is needed to wire dependences of application. As you can see above, I created 2 mocks (input and output of the flow) which takes place of real implementation (method override on the provider object)

In "then" section I asserts method call not caring of the order and each parameter (ANY matcher).

What is import assert_has_calls a method of mock, so load_file must be mock. In line 16 I create MagicMock with method load_file which is another MagicMock but this time with side effect.

In case when we need to mock only one method of real object it is possible to create following code:

1
2
3
    pandas_repository_spy = OtherRepository(app_context.gateways.spark_session_provider())
    pandas_repository_spy.publish = MagicMock()
    app_context.repositories.pandas_repository.override(pandas_repository_spy)

In this code I created manually real object initialized by object retrieved from DI context. Then overriding method by Monkey Patching I changed behaviour of one method.

Summary

Comparing this integration test with those written in Java/Scala in Python I didn't needed to care of multi thread processing. Maybe because I didn't explicate defined number of workers and it can work in one thread. I must to check, what happens when I define them.

In Python it is required to deliver Java and Spark binaries and configure additionally environment variables. In my case I need to generate additionally egg with python sources and define path to be added in Spark context.

My code was prepared to work with: Apache Spark 3.1.x, Dependency Injector framework 4.38.x, Pytest 7.0.x, Python 3.8.x.

Resources:

[1] - python dependency injector

[2] - pytest

[3] - unittest mock

[4] - pyspark

 

No comments:

Post a Comment