Hi Boyuan, I understand. I basically have to re-open the Kafka connection for every 'bundle', also since my Kafka consumer is not serializable. It indeed might be a bit inefficient, but this is mostly for testing purposes so I guess it's ok for now. I'm using a FlinkRunner and unfortunately, DataFlow is not an option. I'm hoping at some point this issue gets resolved [1] and I will be able to use the built-in Beam Kafka connector.
Thanks for your help! Wouter [1] - https://issues.apache.org/jira/browse/BEAM-11998 On Mon, 10 May 2021 at 19:50, Boyuan Zhang <[email protected]> wrote: > Hi Wouter, > > So in short my flow is like this: >> - I have a stateful DoFn, which reads from a PCollection with the >> configuration of my KafkaConsumer. >> - The stateful DoFn reads this message from the PCollection and starts a >> KafkaConsumer (in the process method) >> - Rather than yielding message incoming from the KafkaConsumer, they are >> added to BagState and CombiningValueStateSpec to keep track of the count. >> - When the CombiningValueStateSpec has a count >= MAX_BUFFER_SIZE we set >> a timer (TimeDomain.REAL_TIME) to be fired in a very small duration (1 ms). >> - The timer method yields all elements in the buffer and clears both the >> buffer state as well as the count state. >> >> However, these timers are never fired (i.e. timer methods are never >> executed). >> > > Your guessing is correct. The timer is scheduled only after the current > bundle finishes processing. I would change your KafkaReadDoFn to > > - in process(), you initialize your Kafka consumer and output first N > records, when it exceeds N, you set a real-time timer and save the > next available Kafka offset to ValueState. Then we should stop the process > by returning from it. > - in onTimer(), you can retrieve the next offset to read from > ValueState and assign that offset to the consumer, and continue to read for > next N elements. When you have outputed N elements, you save the offset > again to the ValueState, reset the timers. > - When timer fires, all steps in onTimer will be repeated. > > > Though this approach should work, it may not have a good performance since > we don't do anything special around distributing works. > > What runner is your target runner? If it's Dataflow, I believe > ExternalTransform should work. > > On Mon, May 10, 2021 at 3:13 AM Wouter Zorgdrager <[email protected]> > wrote: > >> Hi Boyuan, >> >> Thanks for your suggestion, that sounds like a good idea. However, where >> do I open my connection with Kafka and start consuming messages? Is it in >> the process method (turning it into a blocking method?). I currently try it >> like this, but timers never seem to be fired. I have the feeling this is >> because of the blocking nature of the process method. >> >> So in short my flow is like this: >> - I have a stateful DoFn, which reads from a PCollection with the >> configuration of my KafkaConsumer. >> - The stateful DoFn reads this message from the PCollection and starts a >> KafkaConsumer (in the process method) >> - Rather than yielding message incoming from the KafkaConsumer, they are >> added to BagState and CombiningValueStateSpec to keep track of the count. >> - When the CombiningValueStateSpec has a count >= MAX_BUFFER_SIZE we set >> a timer (TimeDomain.REAL_TIME) to be fired in a very small duration (1 ms). >> - The timer method yields all elements in the buffer and clears both the >> buffer state as well as the count state. >> >> However, these timers are never fired (i.e. timer methods are never >> executed). >> Is there something I'm doing wrong or do you have any other suggestions >> there? >> >> On Thu, 8 Apr 2021 at 19:35, Boyuan Zhang <[email protected]> wrote: >> >>> Hi Wouter, >>> >>> Dataflow fuses your operations into several stages during execution >>> time. The downstream stage can have inputs only when upstream finishes at >>> least one bundle(a small portion of work that Dataflow gives to the >>> worker). In your case, it seems like your _ConsumeKafkaTopic runs >>> forever and never stops. Thus the operations in the same stage >>> of _ConsumeKafkaTopic can see the output from _ConsumeKafkaTopic >>> immediately but the downstream stages can never have the input there. >>> >>> One workaround I can come up is to change you _ConsumeKafkaTopic to a >>> stateful dofn. You can give _ConsumeKafkaTopic a variable to identify how >>> many records _ConsumeKafkaTopic outputs once. Let's say the number is >>> 1000. When your output messages exceed 1000, you can save the current >>> consumer position into ValueState and set a now() + 5s processing time >>> timer. In the timer callback, you can output another 1000 records again and >>> set next timer. Does it sound work for you? >>> >>> On Thu, Apr 8, 2021 at 3:47 AM Wouter Zorgdrager <[email protected]> >>> wrote: >>> >>>> Dear Beam community, >>>> >>>> I'm currently trying to set up a Beam pipeline using the PythonSDK. I >>>> need to read from an unbounded Kafka source. My flow looks like this: >>>> [image: flow.png] >>>> It reads event from a Kafka topic, using a (stateless) router the >>>> events get to different PTransforms and the final results get written to a >>>> Kafka topic again. I use side outputs to 'route' and beam.Flatten() to >>>> merge PCollections again. >>>> >>>> In my first attempt, I used the built-in Kafka IO which uses an >>>> ExternalTransform [1]. However, no data was being consumed from my Kafka >>>> topic. I think this was also discussed in the mailing list before [2] and >>>> has to do with this issue [3]. >>>> >>>> In my second attempt, I used this Kafka connector from an external >>>> source [4]. This connector worked initially, where I did not fully >>>> implement the flow as described above. When using this connector, I see >>>> messages get 'stuck' in the dataflow and not being fully processed until >>>> the end. I.e. they get processed until a certain DoFn in the pipeline, but >>>> not any further. Oddly enough, when I get rid of the beam.Flatten() (and >>>> therefore I can't merge collections and just use a linear pipeline) it does >>>> work. Moreover, when a substitute my kafka consumer with a simple >>>> beam.Create() the full pipeline works as well. >>>> >>>> I think the problem is that the Kafka connector I'm using is not a >>>> Splittable DoFn and just blocks completely while consuming [5]. However, >>>> I'm confused that this does work for a linear pipeline (without >>>> flattening). >>>> >>>> To give some more context; I'm planning to deploy this using a >>>> FlinkRunner. Moreover, I have a 'hard' dependency on using Python and >>>> Kafka. In other words, I can't move to another IO framework or programming >>>> language. >>>> >>>> I hope you can help me out, provide some suggestions or workarounds. >>>> >>>> Thanks in advance! >>>> Regards, >>>> Wouter >>>> >>>> [1] - >>>> https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.kafka.html >>>> [2] - >>>> https://lists.apache.org/x/thread.html/r9c74a8a7efa4b14f2f1d5f77ce8f12128cf1071861c1627f00702415@%3Cuser.beam.apache.org%3E >>>> >>>> [3] - https://issues.apache.org/jira/browse/BEAM-11998 >>>> [4] - https://github.com/mohaseeb/beam-nuggets >>>> [5] - >>>> https://github.com/mohaseeb/beam-nuggets/blob/39d2493b161ebbcbff9f4857115d527f6fefda77/beam_nuggets/io/kafkaio.py#L76 >>>> >>>> >>>
