'Integration testing flink job

I've written a small flink application. I has some input, and enriches it with data from an external source. It's an RichAsyncFunction and within the open method I construct a http client to be used for the enrichment.

Now I want to write an integration test for my job. But since the http client is created within the open method I have no means to provide it, and mock it in my integration test. I've tried to refactor it providing it within the constructor, but I'm always getting serialisation errors.

This is the example I'm working from: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html

Thanks in advance :)



Solution 1:[1]

This question was posted over a year ago but I'll post the answer in-case anyone stumbles upon this in the future.

The serialization exception you are seeing is likely this

Exception encountered when invoking run on a nested suite. *** ABORTED *** (610 milliseconds)
  java.lang.NullPointerException:
  at java.util.Objects.requireNonNull(Objects.java:203)
  at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
  at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:136)
  at org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:77)
  at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
  at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:366)
  at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:165)
...

The reason is that your test operator needs to know how to deserialize the DataStream input type. The only way to provide this is by supplying it directly while initializing the testHarness and then passing it to the setup() method call.

So to test the example from the Flink docs you linked you can do something like this (my implementation is in Scala but you can adapt it to Java as well)

import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
import org.apache.flink.streaming.runtime.tasks.{StreamTaskActionExecutor, TestProcessingTimeService}
import org.apache.flink.streaming.runtime.tasks.mailbox.{MailboxExecutorImpl, TaskMailboxImpl}
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}


/**
 This test case is written using Flink 1.11+.
 Older versions likely have a simpler constructor definition for [[AsyncWaitOperator]] so you might have to remove the last two arguments (processingTimeService and mailboxExecutor)
*/
class AsyncDatabaseRequestSuite extends FunSuite with BeforeAndAfter with Matchers {

  var testHarness: OneInputStreamOperatorTestHarness[String, (String, String)] = _
  val TIMEOUT = 1000
  val CAPACITY = 1000
  val MAILBOX_PRIORITY = 0

  def createTestHarness: Unit = {
    val operator = new AsyncWaitOperator[String, (String, String)](
      new AsyncDatabaseRequest {

        override def open(configuration: Configuration): Unit = {
          client = new MockDatabaseClient(host, post, credentials);  // put your mock DatabaseClient object here
        }
      },
      TIMEOUT,
      CAPACITY,
      OutputMode.UNORDERED,
      new TestProcessingTimeService,
      new MailboxExecutorImpl(
        new TaskMailboxImpl,
        MAILBOX_PRIORITY,
        StreamTaskActionExecutor.IMMEDIATE
      )
    )

    // supply the TypeSerializer for the "input" type of the operator
    testHarness = new OneInputStreamOperatorTestHarness[String, (String, String)](
      operator,
      TypeExtractor.getForClass(classOf[String]).createSerializer(new ExecutionConfig)
    )

    // supply the TypeSerializer for the "output" type of the operator to the setup() call
    testHarness.setup(
      TypeExtractor.getForClass(classOf[(String, String)]).createSerializer(new ExecutionConfig)
    )
    testHarness.open()
  }

  before {
    createTestHarness
  }

  after {
    testHarness.close()
  }

  test("Your test case goes here") {
    // fill in your test case here
  }

}

Solution 2:[2]

Here is the solution in Java

class TestingClass {

@InjectMocks
ClassUnderTest cut;

private static OneInputStreamOperatorTestHarness<IN, OUT> testHarness; // replace IN, OUT with your asyncFunction's
private static long TIMEOUT = 1000;
private static int CAPACITY = 1000;
private static int MAILBOX_PRIORITY = 0;
private long UNUSED_TIME = 0L;

Driver driverRef;

public void createTestHarness() throws Exception {

    cut = new ClassUnderTest() {

        @Override
        public void open(Configuration parameters) throws Exception {
            driver = mock(Driver.class); // mock your driver (external data source here).
            driverRef = driver; // create external ref to driver to refer to in test
        }
    };

    MailboxExecutorImpl mailboxExecutorImpl = new MailboxExecutorImpl(
            new TaskMailboxImpl(), MAILBOX_PRIORITY, StreamTaskActionExecutor.IMMEDIATE
    );

    AsyncWaitOperator operator  = new AsyncWaitOperator<>(
            gatewayEnrichment,
            TIMEOUT,
            CAPACITY,
            ORDERED,
            new TestProcessingTimeService(),
            mailboxExecutorImpl
    );

    testHarness = new OneInputStreamOperatorTestHarness<IN, OUT>(
            operator,
            TypeExtractor.getForClass(IN.class).createSerializer(new ExecutionConfig())
    );

    testHarness.setup(TypeExtractor.getForClass(OUT.class).createSerializer(new ExecutionConfig()));
    testHarness.open();
    }

    @BeforeEach()
    void setUp() throws Exception {
        createTestHarness();
        MockitoAnnotations.openMocks(this);
    }

    @AfterEach
    void tearDown() throws Exception {
       testHarness.close();
    }

    @Test
    public void test_yourTestCase() throws Exception { 
    }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Adam Goldberg