Monday, 30 August 2021

This week 2/2021 - Testing Spark Application

Last year started my adventure with Big Data and Hadoop ecosystem. I created my first simple Java application for Spark. It worked but I felt some lack. Where are tests for that? I looked around and I couldn't find any good example of module testing. I was looking for test example for hexagon application, this time in Spark, so: mocks on input and output and call facade main method to check behaviour.

Finally I found a solution. Like standard business application I extracted code which is a point of contact with external world and mocked it. In my case I was testing a simple application which consumes message from Kafka, transforms it and stores a result in one of two tables.
Bellow I presented a simple class dependence which correspond to my application.

The KafkaStream and HadoopRepository are input and output of my application. On the top of them there are services and facade where are implemented transformation and DAG model creation.
 
To increase difficulty of this task I created my first test in scala.
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class TransformationSpec extends AnyFlatSpec 
                            with BeforeAndAfter 
                            with BeforeAndAfterEach
                            with should.Matchers 
                            with GivenWhenThen {

  private val master = "local[3]"
  private val appName = "test-app"
  private val batchDuration = Seconds(1)

  private val kafkaStream = mock[KafkaStream]
  private val rowRecordRepositoryMock = mock[HadoopRepository[RowRecord]]
  private val errorRecordRepositoryMock = mock[HadoopRepository[ErrorRecord]]
  private val serviceA: ServiceA = new ServiceA(rowRecordRepositoryMock, errorRecordRepositoryMock)
  private val serviceB: ServiceB = new ServiceB(kafkaStream)
  private val underTest: TransferDataService = new TransferDataService(serviceA, serviceB)

  private var sc: SparkContext = _
  private var ssc: StreamingContext = _
  private val lines = mutable.Queue[RDD[ConsumerRecord[String, ReceivedMsg]]]()

  before {
    Mockito.reset(kafkaStream, rowRecordRepositoryMock, errorRecordRepositoryMock)
    val conf = new SparkConf()
      .setMaster(master)
      .setAppName(appName)
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    ssc = new StreamingContext(conf, batchDuration)
    sc = ssc.sparkContext
    mockKafkaStream
  }

  after {
    if (ssc != null) {
      ssc.stop()
    }
  }
...
 
In example I am building application (lines 11-18) - injecting dependences. Then I define class variables for Spark context and Spark Streaming context and finally a queue to simulate message receiving.
 
In "before" method for each test state of all mocks are reset, Spark configuration, context and mock KafkaStream with specific behaviour are created. Below is shown how messages are added to queue and how KafkaStream is mocked using MockitoSugar library.
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  private def receivedMessage(msg: ReceivedMsg) = {
    lines += sc.makeRDD(Seq(new ConsumerRecord("test", 0, 0, "", msg)))
  }

  private def mockKafkaStream = {
    MockitoSugar.doAnswer(() => {
      new JavaInputDStream[ConsumerRecord[String, ReceivedMsg]](
        ssc.queueStream(lines, true)
      )
    }).when(kafkaStream).createDirectStream(any());
  }
 
Now it is all ready to write first test case. I chose FlatSpec style. In "given" section I refilled message queue. In "when" section I called facade to initiate DAG model. I started Spark Streaming context and I run process.
In "then" section I asserted mock and captors. Application uses many threads so I couldn't call assertions just like that. To solve this problem scalatest provides method "eventually" to check conditions with some interval until match condition or meet timeout.
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
"Deserialized message" should "be stored in target table" in {

  Given("deserialized message")
  receivedMessage(new ReceivedMsg())

  When("Processing")
  underTest.process(new JavaStreamingContext(ssc));
  ssc.start()

  Then("message is stored by n records")
  eventually(timeout(2 second)) {
    val captor: ArgumentCaptor[JavaRDD[RowRecord]] = ArgumentCaptor.forClass(classOf[JavaRDD[RowRecord]])
    verify(rowRecordRepositoryMock)
      .save(captor.capture())

    val records = captor.getValue.collect()
    records should have size (4)

    verify(errorRecordRepositoryMock, never())
      .save(any())

    verify(kafkaStream)
      .commit(any(), any());
  }
}
 
For me as Java Developer it new experience to write test in scala and for Spark. It wasn't big challenge when I used first 3 sources.
Concurrently I am writing some simple scripts in python. This is new area for me so the next step will be testing Python scripts for Spark.

To prepare this example I used scala 2.12, Spark 3.0.2, Mockito-scala 1.12.6  and scalatest 3.2.7

Sources:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

No comments:

Post a Comment