Hey Rion, We solved this issue by using usual, unbounded streams, and using awaitility library to express conditions that would end the test - for example, having particular data in a table.
IMO this type of testing has the advantage that you won't have divergent behavior from production as you have experienced. Regards, Maciej On Sun, Mar 14, 2021, 05:41 Rion Williams <rionmons...@gmail.com> wrote: > Following up on this issue, I realized my initial problem was that my test > case only contained a single message to send through the pipeline. This > resulted in the earliest offset also being the latest and things didn’t > exactly work as expected. Once I added several other messages and sent them > through, the pipeline appeared to run as expected. > > However, the use of “bounded” seems to be fickle in terms of test cases. > Since an experience is thrown once the bound is reached, I can typically > just wrap my test execution within a try/catch and simply apply my > assertion afterwards. > > This occasionally results in passing tests, but in others, it seems that > the bound is reached prior to processing the messages it had seen thus far, > and as a result yields a failing test. I don’t know if this is a bug, or > intentional, but I’m not aware of a workaround that could “force” the > pipeline to finish processing all of the messages from the topic once the > bound is reached. I’ve tried sending through “flush records” to the topic, > however since there are multiple partitions, it’s not guaranteed that the > pipeline will read those last. > > This is purely a testing problem, as a production job would be streaming > and unbounded, however I’d love to have a reliable integration test or a > pattern that I could use to guarantee the processing of a finite set of > data via a KafkaSource (I.e. send finite records to Kafka, read from topic, > process all records, apply assertion after processing). > > Any ideas/recommendations/workarounds would be greatly welcome and I’d be > happy to share my specific code / use-cases if needed. > > Thanks much, > > Rion > > On Mar 12, 2021, at 10:19 AM, Rion Williams <rionmons...@gmail.com> wrote: > > > Hi all, > > I've been using the KafkaSource API as opposed to the classic consumer and > things have been going well. I configured my source such that it could be > used in either a streaming or bounded mode, with the bounded approach > specifically aimed at improving testing (unit/integration). > > I've noticed that when I attempt to run through a test - it seems that the > pipeline never acknowledges the "end" of the stream in a bounded context > and just runs forever and never makes it to my assert. > > Does anything look glaringly wrong with how the source is being defined? > > object KafkaEventSource { > > fun withParameters(parameters: ParameterTool): KafkaSource<Event> { > val schemaRegistryUrl = parameters.getRequired("schema.registry.url") > > val builder = KafkaSource.builder<Event>() > .setBootstrapServers(parameters.getRequired("bootstrap.servers")) > .setGroupId(parameters.getRequired("group.id")) > .setStartingOffsets(OffsetsInitializer.earliest()) > .setProperty("schema.registry.url", schemaRegistryUrl) > .setTopics(parameters.getRequired("topic")) > .setDeserializer(EventDeserializer(schemaRegistryUrl)) > > if (parameters.getBoolean("bounded", false)) { > builder.setBounded(OffsetsInitializer.latest()) > } > > return builder.build() > } > } > > I can verify that the generated source has it's boundedness set properly > and all of the configuration options are correct. > > My test itself is fairly simple and can be broken down as follows: > > 1. Inject records into a Kafka Topic > 2. Initialize my Flink job using all of my testing parameters > 3. Apply my assertion (in this case verifying that a JdbcSink wrote to > a specific database) > > @Test > fun `Example `(){ > // Arrange > val events = getTestEvents() > sendToKafka(events, parameters) > > // Act > EntityIdentificationJob.run(parameters) > > // Assert > val users = queryCount("SELECT * FROM users", connection) > assertEquals(1, users) > } > > Where my job itself is broken down further and reads from the source, > performs a process function into multiple side outputs and writes each of > them to a distinct JdbcSink based on the type: > > @JvmStatic > fun main(args: Array<String>) { > val parameters = loadParams(args) > val stream = StreamExecutionEnvironment.getExecutionEnvironment() > > // Read from Kafka > val entities = stream > .fromSource(KafkaEventSource.withParameters(parameters), > WatermarkStrategy.noWatermarks(), "kafka") > .process(IdentifyEntitiesFunction()) > > // Write out each tag to its respective sink > for (entityType in EntityTypes.all) { > entities > .getSideOutput(entityType) > .addSink(PostgresEntitySink.withEntity(entityType.typeInfo, > parameters)) > } > > stream.execute(parameters.getRequired("application")) > } > > I can verify in the logs that my sink is being executed and writing to the > appropriate database, however the job itself never finishes. I've tried it > using a single Kafka partition as well as multiple partitions and even > commented out the logic related to writing to the database. It still just > seems to run ... forever. > > Any recommendations? Perhaps there's a bad configuration or setting that > isn't being used as intended? > > Thanks, > > Rion > >