Hi Wouter, So in short my flow is like this: > - I have a stateful DoFn, which reads from a PCollection with the > configuration of my KafkaConsumer. > - The stateful DoFn reads this message from the PCollection and starts a > KafkaConsumer (in the process method) > - Rather than yielding message incoming from the KafkaConsumer, they are > added to BagState and CombiningValueStateSpec to keep track of the count. > - When the CombiningValueStateSpec has a count >= MAX_BUFFER_SIZE we set a > timer (TimeDomain.REAL_TIME) to be fired in a very small duration (1 ms). > - The timer method yields all elements in the buffer and clears both the > buffer state as well as the count state. > > However, these timers are never fired (i.e. timer methods are never > executed). >
Your guessing is correct. The timer is scheduled only after the current bundle finishes processing. I would change your KafkaReadDoFn to - in process(), you initialize your Kafka consumer and output first N records, when it exceeds N, you set a real-time timer and save the next available Kafka offset to ValueState. Then we should stop the process by returning from it. - in onTimer(), you can retrieve the next offset to read from ValueState and assign that offset to the consumer, and continue to read for next N elements. When you have outputed N elements, you save the offset again to the ValueState, reset the timers. - When timer fires, all steps in onTimer will be repeated. Though this approach should work, it may not have a good performance since we don't do anything special around distributing works. What runner is your target runner? If it's Dataflow, I believe ExternalTransform should work. On Mon, May 10, 2021 at 3:13 AM Wouter Zorgdrager <[email protected]> wrote: > Hi Boyuan, > > Thanks for your suggestion, that sounds like a good idea. However, where > do I open my connection with Kafka and start consuming messages? Is it in > the process method (turning it into a blocking method?). I currently try it > like this, but timers never seem to be fired. I have the feeling this is > because of the blocking nature of the process method. > > So in short my flow is like this: > - I have a stateful DoFn, which reads from a PCollection with the > configuration of my KafkaConsumer. > - The stateful DoFn reads this message from the PCollection and starts a > KafkaConsumer (in the process method) > - Rather than yielding message incoming from the KafkaConsumer, they are > added to BagState and CombiningValueStateSpec to keep track of the count. > - When the CombiningValueStateSpec has a count >= MAX_BUFFER_SIZE we set a > timer (TimeDomain.REAL_TIME) to be fired in a very small duration (1 ms). > - The timer method yields all elements in the buffer and clears both the > buffer state as well as the count state. > > However, these timers are never fired (i.e. timer methods are never > executed). > Is there something I'm doing wrong or do you have any other suggestions > there? > > On Thu, 8 Apr 2021 at 19:35, Boyuan Zhang <[email protected]> wrote: > >> Hi Wouter, >> >> Dataflow fuses your operations into several stages during execution time. >> The downstream stage can have inputs only when upstream finishes at least >> one bundle(a small portion of work that Dataflow gives to the worker). In >> your case, it seems like your _ConsumeKafkaTopic runs forever and never >> stops. Thus the operations in the same stage of _ConsumeKafkaTopic can see >> the output from _ConsumeKafkaTopic immediately but the downstream stages >> can never have the input there. >> >> One workaround I can come up is to change you _ConsumeKafkaTopic to a >> stateful dofn. You can give _ConsumeKafkaTopic a variable to identify how >> many records _ConsumeKafkaTopic outputs once. Let's say the number is >> 1000. When your output messages exceed 1000, you can save the current >> consumer position into ValueState and set a now() + 5s processing time >> timer. In the timer callback, you can output another 1000 records again and >> set next timer. Does it sound work for you? >> >> On Thu, Apr 8, 2021 at 3:47 AM Wouter Zorgdrager <[email protected]> >> wrote: >> >>> Dear Beam community, >>> >>> I'm currently trying to set up a Beam pipeline using the PythonSDK. I >>> need to read from an unbounded Kafka source. My flow looks like this: >>> [image: flow.png] >>> It reads event from a Kafka topic, using a (stateless) router the events >>> get to different PTransforms and the final results get written to a Kafka >>> topic again. I use side outputs to 'route' and beam.Flatten() to merge >>> PCollections again. >>> >>> In my first attempt, I used the built-in Kafka IO which uses an >>> ExternalTransform [1]. However, no data was being consumed from my Kafka >>> topic. I think this was also discussed in the mailing list before [2] and >>> has to do with this issue [3]. >>> >>> In my second attempt, I used this Kafka connector from an external >>> source [4]. This connector worked initially, where I did not fully >>> implement the flow as described above. When using this connector, I see >>> messages get 'stuck' in the dataflow and not being fully processed until >>> the end. I.e. they get processed until a certain DoFn in the pipeline, but >>> not any further. Oddly enough, when I get rid of the beam.Flatten() (and >>> therefore I can't merge collections and just use a linear pipeline) it does >>> work. Moreover, when a substitute my kafka consumer with a simple >>> beam.Create() the full pipeline works as well. >>> >>> I think the problem is that the Kafka connector I'm using is not a >>> Splittable DoFn and just blocks completely while consuming [5]. However, >>> I'm confused that this does work for a linear pipeline (without >>> flattening). >>> >>> To give some more context; I'm planning to deploy this using a >>> FlinkRunner. Moreover, I have a 'hard' dependency on using Python and >>> Kafka. In other words, I can't move to another IO framework or programming >>> language. >>> >>> I hope you can help me out, provide some suggestions or workarounds. >>> >>> Thanks in advance! >>> Regards, >>> Wouter >>> >>> [1] - >>> https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.kafka.html >>> [2] - >>> https://lists.apache.org/x/thread.html/r9c74a8a7efa4b14f2f1d5f77ce8f12128cf1071861c1627f00702415@%3Cuser.beam.apache.org%3E >>> >>> [3] - https://issues.apache.org/jira/browse/BEAM-11998 >>> [4] - https://github.com/mohaseeb/beam-nuggets >>> [5] - >>> https://github.com/mohaseeb/beam-nuggets/blob/39d2493b161ebbcbff9f4857115d527f6fefda77/beam_nuggets/io/kafkaio.py#L76 >>> >>> >>
