Hey Rion,

We solved this issue by using usual, unbounded streams, and using
awaitility library to express conditions that would end the test - for
example, having particular data in a table.

IMO this type of testing has the advantage that you won't have divergent
behavior from production as you have experienced.

Regards,
Maciej



On Sun, Mar 14, 2021, 05:41 Rion Williams <rionmons...@gmail.com> wrote:

> Following up on this issue, I realized my initial problem was that my test
> case only contained a single message to send through the pipeline. This
> resulted in the earliest offset also being the latest and things didn’t
> exactly work as expected. Once I added several other messages and sent them
> through, the pipeline appeared to run as expected.
>
> However, the use of “bounded” seems to be fickle in terms of test cases.
> Since an experience is thrown once the bound is reached, I can typically
> just wrap my test execution within a try/catch and simply apply my
> assertion afterwards.
>
> This occasionally results in passing tests, but in others, it seems that
> the bound is reached prior to processing the messages it had seen thus far,
> and as a result yields a failing test. I don’t know if this is a bug, or
> intentional, but I’m not aware of a workaround that could “force” the
> pipeline to finish processing all of the messages from the topic once the
> bound is reached. I’ve tried sending through “flush records” to the topic,
> however since there are multiple partitions, it’s not guaranteed that the
> pipeline will read those last.
>
> This is purely a testing problem, as a production job would be streaming
> and unbounded, however I’d love to have a reliable integration test or a
> pattern that I could use to guarantee the processing of a finite set of
> data via a KafkaSource (I.e. send finite records to Kafka, read from topic,
> process all records, apply assertion after processing).
>
> Any ideas/recommendations/workarounds would be greatly welcome and I’d be
> happy to share my specific code / use-cases if needed.
>
> Thanks much,
>
> Rion
>
> On Mar 12, 2021, at 10:19 AM, Rion Williams <rionmons...@gmail.com> wrote:
>
> 
> Hi all,
>
> I've been using the KafkaSource API as opposed to the classic consumer and
> things have been going well. I configured my source such that it could be
> used in either a streaming or bounded mode, with the bounded approach
> specifically aimed at improving testing (unit/integration).
>
> I've noticed that when I attempt to run through a test - it seems that the
> pipeline never acknowledges the "end" of the stream in a bounded context
> and just runs forever and never makes it to my assert.
>
> Does anything look glaringly wrong with how the source is being defined?
>
> object KafkaEventSource {
>
>     fun withParameters(parameters: ParameterTool): KafkaSource<Event> {
>         val schemaRegistryUrl = parameters.getRequired("schema.registry.url")
>
>         val builder = KafkaSource.builder<Event>()
>             .setBootstrapServers(parameters.getRequired("bootstrap.servers"))
>             .setGroupId(parameters.getRequired("group.id"))
>             .setStartingOffsets(OffsetsInitializer.earliest())
>             .setProperty("schema.registry.url", schemaRegistryUrl)
>             .setTopics(parameters.getRequired("topic"))
>             .setDeserializer(EventDeserializer(schemaRegistryUrl))
>
>         if (parameters.getBoolean("bounded", false)) {
>             builder.setBounded(OffsetsInitializer.latest())
>         }
>
>         return builder.build()
>     }
> }
>
> I can verify that the generated source has it's boundedness set properly
> and all of the configuration options are correct.
>
> My test itself is fairly simple and can be broken down as follows:
>
>    1. Inject records into a Kafka Topic
>    2. Initialize my Flink job using all of my testing parameters
>    3. Apply my assertion (in this case verifying that a JdbcSink wrote to
>    a specific database)
>
> @Test
> fun `Example `(){
>     // Arrange
>     val events = getTestEvents()
>     sendToKafka(events, parameters)
>
>     // Act
>     EntityIdentificationJob.run(parameters)
>
>     // Assert
>     val users = queryCount("SELECT * FROM users", connection)
>     assertEquals(1, users)
> }
>
> Where my job itself is broken down further and reads from the source,
> performs a process function into multiple side outputs and writes each of
> them to a distinct JdbcSink based on the type:
>
> @JvmStatic
> fun main(args: Array<String>) {
>     val parameters = loadParams(args)
>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>
>     // Read from Kafka
>     val entities = stream
>        .fromSource(KafkaEventSource.withParameters(parameters), 
> WatermarkStrategy.noWatermarks(), "kafka")
>        .process(IdentifyEntitiesFunction())
>
>     // Write out each tag to its respective sink
>     for (entityType in EntityTypes.all) {
>         entities
>             .getSideOutput(entityType)
>             .addSink(PostgresEntitySink.withEntity(entityType.typeInfo, 
> parameters))
>     }
>
>     stream.execute(parameters.getRequired("application"))
> }
>
> I can verify in the logs that my sink is being executed and writing to the
> appropriate database, however the job itself never finishes. I've tried it
> using a single Kafka partition as well as multiple partitions and even
> commented out the logic related to writing to the database. It still just
> seems to run ... forever.
>
> Any recommendations? Perhaps there's a bad configuration or setting that
> isn't being used as intended?
>
> Thanks,
>
> Rion
>
>

Reply via email to