Saturday, 12 February 2022

This week 2/2022 - Testing PySpark application

In this post I'd like to present my remarks referring to writing python tests. I have just started writing tests in Python for Apache Spark application. Before that I have done that in Java or Scala so I was mostly focused on testing in Python using dependency injection.

This time I brake TDD rule and I wrote a simple application at first which download some data from external resource and store it in parquet file on hdfs and then I wrote tests.

 


To build this application I used DI framework [1] which helps me to decompose this application on exchangeable elements and avoid monkey patching.

I created a simple matcher for DataFrame like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class DataFrameWithSchema(object):
    """
    A helper object that compares Spark DataFrame schema,
    schema parameter contains list of column names and column types, ex. ['col_name: type']
    """
    def __init__(self, schema: list, any_order: bool = True):
        self.schema = schema
        self.any_order = any_order

    def __eq__(self, other: DataFrame):

        other_schema = [dtype[0] + ': ' + dtype[1] for dtype in other.dtypes]
        if self.any_order:
            return set(self.schema) == set(other_schema)
        return self.schema == other_schema

    def __ne__(self, other: DataFrame):
        return not self.__eq__(other)

    def __repr__(self):
        return 'DataFrame[' + ', '.join(self.schema) + ']'

It compares types and names of columns in DataFrame.

Then I prepared a dependency injection context using pytest fixture:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@pytest.fixture(scope='module', autouse=True)
def app_context():
    """
    Setup function which create DI context and loads config from yaml file
    """
    application = Application()
    application.config.from_yaml("../config.yml", required=True)
    application.core.init_resources()
    application.wire(modules=[sys.modules[__name__]])
    yield application

In this function I load a config file, wire dependences and return them to test context.

Then I wrote a test.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def test_load_data(app_context):
    """
    Test function which takes a DI context prepared by fixture function.
    """

    # given
    def load_data(data_type):
        """
        Loads data from test directory depending on data type
        """
        if data_type == '1':
            return Data(data_type, open("data1.csv", "rb"))
        return Data(data_type, open("data2.csv", "rb"))

    # Mocked downloader class with prepared function response.
    file_downloader_mock = MagicMock(spec=FileDownloader, load_file=MagicMock(side_effect=load_data))
    # Replacement of implementation for container
    app_context.repositories.file_downloader.override(file_downloader_mock)

    parquet_file_data_frame_repository_mock = MagicMock(spec=HDFSFilePublisher)
    app_context.repositories.parquet_file_data_frame_repository.override(parquet_file_data_frame_repository_mock)

    data_date = '2020-12-31'

    # when
    loader_runner.main(data_date)

    # then
    # Verification of method call. What is important, 'file_downloader_mock.load_file' must be a mock
    # in our case it is method with side effect.
    file_downloader_mock.load_file.assert_has_calls([call('1', f'https://example.com/{data_date}', ANY)],
                                                    any_order=True)

    parquet_file_data_frame_repository_mock.publish.assert_has_calls(
        [call(DataFrameWithSchema(['name: string', 'data_date: date']), '/tmpfile')],
        any_order=True)

Having Java background and being a beginner in Python testing I created code as above.

In test code there is nothing about creating Spark context but it is created in production code by DI provider when it is needed to wire dependences of application. As you can see above, I created 2 mocks (input and output of the flow) which takes place of real implementation (method override on the provider object)

In "then" section I asserts method call not caring of the order and each parameter (ANY matcher).

What is import assert_has_calls a method of mock, so load_file must be mock. In line 16 I create MagicMock with method load_file which is another MagicMock but this time with side effect.

In case when we need to mock only one method of real object it is possible to create following code:

1
2
3
    pandas_repository_spy = OtherRepository(app_context.gateways.spark_session_provider())
    pandas_repository_spy.publish = MagicMock()
    app_context.repositories.pandas_repository.override(pandas_repository_spy)

In this code I created manually real object initialized by object retrieved from DI context. Then overriding method by Monkey Patching I changed behaviour of one method.

Summary

Comparing this integration test with those written in Java/Scala in Python I didn't needed to care of multi thread processing. Maybe because I didn't explicate defined number of workers and it can work in one thread. I must to check, what happens when I define them.

In Python it is required to deliver Java and Spark binaries and configure additionally environment variables. In my case I need to generate additionally egg with python sources and define path to be added in Spark context.

My code was prepared to work with: Apache Spark 3.1.x, Dependency Injector framework 4.38.x, Pytest 7.0.x, Python 3.8.x.

Resources:

[1] - python dependency injector

[2] - pytest

[3] - unittest mock

[4] - pyspark

 

Saturday, 29 January 2022

This week 1/2022 - Spring WS client

In this post I'd like to present my reflection after implementing Web Service client in Spring WS. A time ago when I have been implementing a client or a service I did that in Apache CXF but this time to archive organisational standards for new implementation I used Spring WS.

Comparing to Apache CXF, from my perspective Spring WS is easier. It needs only to generate a model classes from WSDL or XSD file. This is important because Spring WS is contact-first implementation (only transform XSD/WSDL to classes, do not support other direction) to avoid incompatibles between different languages and implementations. 

I was a bit surprised that I don't create any Webservice Stub for my client. Only a model classes.

Below are presented a configuration of beans. The wsClient where I injected marshaller and defined interceptors. In my case I created interceptor to signature and encrypt message. 

In method signatureInterceptor is whole interceptor configuration.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Bean
public Client wsClient(final Jaxb2Marshaller marshaller) throws Exception {
    final Client client = new Client("https://example.com/webservice_uri");
    client.setMarshaller(marshaller);
    client.setUnmarshaller(marshaller);
    client.setInterceptors(new ClientInterceptor[]{signatureInterceptor()});
    return client;
}

private ClientInterceptor signatureInterceptor() throws Exception {
    Wss4jSecurityInterceptor wss4jSecurityInterceptor = new Wss4jSecurityInterceptor();
    wss4jSecurityInterceptor.setEnableSignatureConfirmation(true);
    wss4jSecurityInterceptor.setSecurementActions("Signature Encrypt");
    wss4jSecurityInterceptor.setSecurementUsername("securementUsername");
    wss4jSecurityInterceptor.setSecurementPassword("securementPassword");
    wss4jSecurityInterceptor.setSecurementSignatureCrypto(getCryptoFactoryBean().getObject());

    wss4jSecurityInterceptor.setSecurementEncryptionUser("encryption-user");
    wss4jSecurityInterceptor.setSecurementEncryptionParts("{Content}{https://example.com/service}getResponse");
    wss4jSecurityInterceptor.setSecurementEncryptionCrypto(getCryptoFactoryBean().getObject());

    return wss4jSecurityInterceptor;
}

@Bean
public CryptoFactoryBean getCryptoFactoryBean() throws IOException {
    CryptoFactoryBean cryptoFactoryBean = new CryptoFactoryBean();
    cryptoFactoryBean.setKeyStorePassword("testKeyStorePassword");
    cryptoFactoryBean.setKeyStoreLocation(new ClassPathResource("some.jks"));
    return cryptoFactoryBean;
}

@Bean
public Jaxb2Marshaller marshaller() {
    final Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
    marshaller.setClassesToBeBound(Request.class, Response.class);
    return marshaller;
}


Implementation of ws client extends WebServiceGatewaySupport and implements my custom method "service". In those method I call webService template with uri and request object.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class Client extends WebServiceGatewaySupport {

    private final String uri;

    public Client(String uri) {
        this.uri = uri;
    }

    public Response service() {
        return (Response)getWebServiceTemplate()
                .marshalSendAndReceive(uri, new Request());
    }
}

In response I got object which I had to cast to expected type.

To use the model in JDK17 I  generated it by maven plugin org.jvnet.jaxb2.maven2:maven-jaxb2-plugin but it is possible to use others. 

Below an example of plugin configuration to generate a model form wsdl file with possible additional annotations defined in wsdl or binding file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<plugin>
    <groupId>org.jvnet.jaxb2.maven2</groupId>
    <artifactId>maven-jaxb2-plugin</artifactId>
    <version>0.14.0</version>
    <dependencies>
        <dependency>
            <groupId>org.glassfish.jaxb</groupId>
            <artifactId>jaxb-runtime</artifactId>
            <version>2.3.6</version>
        </dependency>
    </dependencies>
    <executions>
        <execution>
            <goals>
                <goal>generate</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
        <schemaLanguage>WSDL</schemaLanguage>
        <schemaDirectory>${project.basedir}/src/main/resources/wsdl</schemaDirectory>
        <schemaIncludes>
            <include>*.wsdl</include>
        </schemaIncludes>
        <args>
            <arg>-Xannotate</arg>
         </args>
        <plugins>
            <plugin>
                <groupId>org.jvnet.jaxb2_commons</groupId>
                <artifactId>jaxb2-basics</artifactId>
                <version>1.11.1</version>
            </plugin>
            <plugin>
                <groupId>org.jvnet.jaxb2_commons</groupId>
                <artifactId>jaxb2-basics-annotate</artifactId>
                <version>1.11.0</version>
            </plugin>
        </plugins>
    </configuration>
</plugin>

And here possible (some of them required) dependences.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-oxm</artifactId>
        <version>5.3.15</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.ws</groupId>
        <artifactId>spring-ws-core</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.ws</groupId>
        <artifactId>spring-ws-security</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.ws</groupId>
        <artifactId>spring-ws-support</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>wsdl4j</groupId>
        <artifactId>wsdl4j</artifactId>
        <version>1.6.3</version>
    </dependency>
    <dependency>
        <groupId>org.jvnet.jaxb2_commons</groupId>
        <artifactId>jaxb2-basics-runtime</artifactId>
        <version>1.11.1</version>
    </dependency>
</dependencies>


Resources:

[1] - Wss4jSecurityInterceptor java doc

[2] - Baeldung Example

[3] - spring ws

 

Saturday, 4 December 2021

This week 3/2021 - springSecurity Rest basic controller

In this short post I'd like to present a simple configuration of Spring Boot application serving stateless service using basic authentication.

Below a web security configurer implementation including all possible ways to define annotation rule matchers (pre, post processing and jsr250 specification)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@EnableGlobalMethodSecurity(
        prePostEnabled = true,
        securedEnabled = true,
        jsr250Enabled = true)
public class SecurityConfig extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(final HttpSecurity http) {
        sessionSettings()
                .andThen(this::headersSecurity)
                .andThen(this::accessRules)
                .unchecked()
                .accept(http);
    }

    private void headersSecurity(final HttpSecurity http) throws Exception {
        http.sessionManagement()
            .sessionCreationPolicy(SessionCreationPolicy.STATELESS)
            .and()
            .csrf()
            .disable()
            .httpBasic()
            .realmName("App");
    }


    private CheckedConsumer<HttpSecurity> sessionSettings() {
        return http -> http
                .sessionManagement()
                .sessionCreationPolicy(STATELESS);
    }

    @Override
    protected void configure(final AuthenticationManagerBuilder auth) throws Exception {
        auth.inMemoryAuthentication()
                .withUser("client")
                .password(passwordEncoder().encode("admin"))
                .roles("CLIENT");
    }
...
}

What is important in code is to define session creation policy.

Then it is possible to implement standard resource.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@RestController
@RequestMapping("/service")
public class Endpoint {

    @PostMapping("/endpoint")
    @Secured("ROLE_CLIENT")
    public String endpoint(@RequestBody final String req) {
        return "test";
    }
}

Resources:

[1] - Spring Security

Monday, 30 August 2021

This week 2/2021 - Testing Spark Application

Last year started my adventure with Big Data and Hadoop ecosystem. I created my first simple Java application for Spark. It worked but I felt some lack. Where are tests for that? I looked around and I couldn't find any good example of module testing. I was looking for test example for hexagon application, this time in Spark, so: mocks on input and output and call facade main method to check behaviour.

Finally I found a solution. Like standard business application I extracted code which is a point of contact with external world and mocked it. In my case I was testing a simple application which consumes message from Kafka, transforms it and stores a result in one of two tables.
Bellow I presented a simple class dependence which correspond to my application.

The KafkaStream and HadoopRepository are input and output of my application. On the top of them there are services and facade where are implemented transformation and DAG model creation.
 
To increase difficulty of this task I created my first test in scala.
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class TransformationSpec extends AnyFlatSpec 
                            with BeforeAndAfter 
                            with BeforeAndAfterEach
                            with should.Matchers 
                            with GivenWhenThen {

  private val master = "local[3]"
  private val appName = "test-app"
  private val batchDuration = Seconds(1)

  private val kafkaStream = mock[KafkaStream]
  private val rowRecordRepositoryMock = mock[HadoopRepository[RowRecord]]
  private val errorRecordRepositoryMock = mock[HadoopRepository[ErrorRecord]]
  private val serviceA: ServiceA = new ServiceA(rowRecordRepositoryMock, errorRecordRepositoryMock)
  private val serviceB: ServiceB = new ServiceB(kafkaStream)
  private val underTest: TransferDataService = new TransferDataService(serviceA, serviceB)

  private var sc: SparkContext = _
  private var ssc: StreamingContext = _
  private val lines = mutable.Queue[RDD[ConsumerRecord[String, ReceivedMsg]]]()

  before {
    Mockito.reset(kafkaStream, rowRecordRepositoryMock, errorRecordRepositoryMock)
    val conf = new SparkConf()
      .setMaster(master)
      .setAppName(appName)
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    ssc = new StreamingContext(conf, batchDuration)
    sc = ssc.sparkContext
    mockKafkaStream
  }

  after {
    if (ssc != null) {
      ssc.stop()
    }
  }
...
 
In example I am building application (lines 11-18) - injecting dependences. Then I define class variables for Spark context and Spark Streaming context and finally a queue to simulate message receiving.
 
In "before" method for each test state of all mocks are reset, Spark configuration, context and mock KafkaStream with specific behaviour are created. Below is shown how messages are added to queue and how KafkaStream is mocked using MockitoSugar library.
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  private def receivedMessage(msg: ReceivedMsg) = {
    lines += sc.makeRDD(Seq(new ConsumerRecord("test", 0, 0, "", msg)))
  }

  private def mockKafkaStream = {
    MockitoSugar.doAnswer(() => {
      new JavaInputDStream[ConsumerRecord[String, ReceivedMsg]](
        ssc.queueStream(lines, true)
      )
    }).when(kafkaStream).createDirectStream(any());
  }
 
Now it is all ready to write first test case. I chose FlatSpec style. In "given" section I refilled message queue. In "when" section I called facade to initiate DAG model. I started Spark Streaming context and I run process.
In "then" section I asserted mock and captors. Application uses many threads so I couldn't call assertions just like that. To solve this problem scalatest provides method "eventually" to check conditions with some interval until match condition or meet timeout.
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
"Deserialized message" should "be stored in target table" in {

  Given("deserialized message")
  receivedMessage(new ReceivedMsg())

  When("Processing")
  underTest.process(new JavaStreamingContext(ssc));
  ssc.start()

  Then("message is stored by n records")
  eventually(timeout(2 second)) {
    val captor: ArgumentCaptor[JavaRDD[RowRecord]] = ArgumentCaptor.forClass(classOf[JavaRDD[RowRecord]])
    verify(rowRecordRepositoryMock)
      .save(captor.capture())

    val records = captor.getValue.collect()
    records should have size (4)

    verify(errorRecordRepositoryMock, never())
      .save(any())

    verify(kafkaStream)
      .commit(any(), any());
  }
}
 
For me as Java Developer it new experience to write test in scala and for Spark. It wasn't big challenge when I used first 3 sources.
Concurrently I am writing some simple scripts in python. This is new area for me so the next step will be testing Python scripts for Spark.

To prepare this example I used scala 2.12, Spark 3.0.2, Mockito-scala 1.12.6  and scalatest 3.2.7

Sources:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Saturday, 28 August 2021

This week 1/2021 - RxJava 3.x - back pressure

I got deep dive into RxJava tutorial and train a lot of possible cases. However most interesting for me is back pressure feature. 

Going throw the mechanic of RxJava for this case, at the moment of subscription generator is initialized (line 3), then subscriber onStart method is called (line 13) and first elements are generated (depending of buffer size).

In onStart method should be "request" called defining how many elements should be generated a the begging. If "request" method is missing this result no further action.
In the following example there is no difference if we ask for more elements than one, generator always waits to buffer be empty in 75% (the level when buffer is refilled is defined in BaseObserveOnSubscriber) and then start to generate.
When calling onComplete and onError flow is intermittent.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void backpressure() {
    final Flowable<Integer> generate = Flowable.generate(
            () -> "INITIALIZED", //initial state
            (state, emitter) -> { //current state
                // do some producing task
                emitter.onNext(product.get());
                return "RUNNING"; // next state
            }
    );
    
    final DefaultSubscriber<Integer> subscriber = new DefaultSubscriber<Integer>() {
        @Override
        protected void onStart() {
            request(1);
        }

        @Override
        public void onNext(final Integer o) {

            request(1);
            // do some consumer work
  
        }

        @Override
        public void onError(final Throwable throwable) {
            
        }

        @Override
        public void onComplete() {
        }
    };
    int bufferSize = 6;
    generate.observeOn(Schedulers.newThread(), false, bufferSize)
            .subscribe(subscriber);
}
 
I was training on version 3.0.11 of RxJava.

Sources:



Sunday, 27 December 2020

This week 7/2020 - Julia

This post describing my first feel when I completed a Julia basic course.

I am experienced Java developer but I have also osculation with C/C++, Python and Octave languages. For me Julia has something from all those languages.

Linear Algebra support:

Octave:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
A = eye(1,2)
%Diagonal Matrix
%
%   1   0

B = eye(3,2)
%Diagonal Matrix
%
%   1   0
%   0   1
%   0   0
C = [A
B]
%C =
%
%   1   0
%   1   0
%   0   1
%   0   0

Julia:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
using LinearAlgebra

A = 1*  Matrix(I,1,2)
# 1×2 Array{Int64,2}:
#  1  0

B = 1*  Matrix(I,3,2)
# 3×2 Array{Int64,2}:
#  1  0
#  0  1
#  0  0

C = [A
       B]
# 4×2 Array{Int64,2}:
#  1  0
#  1  0
#  0  1
#  0  0

There is also similarity when multiply matrixes. Julia support ex. operations f(A) = A*A and f.(A) whene every A[i,j] * A[i,j].

Syntax similarity to Python:

Julia:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# for loop
for i in 1:10, j in 1:20
    println("Hi $i , $j")
end
for item in items
    println("Hi $item")
end

# function definition
function power(x)
# last element is returnes - the same as in Python
    x^2
end

# other options to define function 
power(x) = x^2

power = x -> x^2

# immutable sorting
sort(x)

#mutable sorting
sort!(x)

Overload operators:

Python:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class String:
    def __init__(self, x=""):
        self.x = x

    def __add__(self, other):
        return self.x == other.x


p1 = String("test1")
p2 = String("test1")

print(p1 + p2)

Julia:

1
2
3
4
5
6
import Base: +

+(x::String, y:: String) = x == y

# returns boolean value
x+y 

Performance:

Benchmarks which I saw in course [3] shows that Julia has similar or a little better performance than C code and this about 2 orders of magnitude than Python.


Resources:

[1] https://julialang.org/

[2] Introduction to Julia (for programmers)

[3] Parallel Computing