Hi Hans, Thanks a lot for the guidance. I was able to run it on Flink but looks like there's a issue with the Kafka Consumer
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecord On analyzing the Fat jar I found that the version of the KafkaConsumer is < 2.x whereas the plugins folder has 2.4.1, which is the version the fat jar should include. Looks like Beam is using an older version of the kafka consumer Thanks Mono On Sat, Apr 23, 2022 at 6:01 AM Hans Van Akelyen <hans.van.akel...@gmail.com> wrote: > Hi Mono, > > I took a bit of time to set up a test environment on my local system > because we can not always by heart if something actually works (we are > working on more tests in combination with spark/flink/dataflow). > But I can confirm it works with a Flink runner. I do agree that error > handling is not ideal, it gets stuck in a waiting loop when the kafka > server is unavailable. The Flink job then never gets published to the > cluster and you sit there wondering what's going on. When everything is > configured correctly it works as expected. > > I created a sample pipeline using the Beam Kafka consumer and a write to > text file to see if the data is being received in the correct format. > > Pipeline: > > Screenshot 2022-04-23 at 14.55.06.png > <https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web> > > Flink console output: > > Screenshot 2022-04-23 at 14.47.34.png > <https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web> > > Settings I used on the Beam run configuration: > > Screenshot 2022-04-23 at 14.53.30.png > <https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web> > > > Hope you get everything working. > If there is anything more I can do please let me know. > > Kr, > Hans > > On Sat, 23 Apr 2022 at 05:02, monajit choudhury <monoji...@gmail.com> > wrote: > >> Hi Hans, >> >> Yeah I realized that apart from AVRO it supports string messages too. But >> the issue is the beam consumer doesn't consume any messages from kafka . >> Even if put garbage in the topic name, it doesn't throw any errors. >> The Java docs says that its only mean to be run with beam runners, does >> it include the Flink runner ? >> >> Apart from that everything works like a charm and we even managed to >> write some custom plugins for our usecases. If we can solve this kafka >> consumer issue, then we are all set for prime time. >> >> Really appreciate your responses so far. >> >> Thanks >> Mono >> >> On Fri, Apr 22, 2022, 15:49 Matt Casters <matt.cast...@neo4j.com> wrote: >> >>> The Beam Kafka Consumer obviously accepts JSON messages as strings. >>> >>> >>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <monoji...@gmail.com >>> >: >>> >>>> Hi Hans, >>>> >>>> Going through the log files I realized it had something to do with >>>> multithreaded executions. I tried using the Beam Kafka Consumer but the >>>> issue is it only supports AVRO. I need to consume json messages >>>> >>>> Thanks >>>> Mono >>>> >>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen < >>>> hans.van.akel...@gmail.com> wrote: >>>> >>>>> Hi Monajit, >>>>> >>>>> This is the auto scaling nature of Flink fighting against the >>>>> requirement of having a single threaded pipeline for Kafka messages (as we >>>>> need to know when messages are finished. When running on Flink the best >>>>> solution would be to use the Beam Kafka Consumer. >>>>> >>>>> Another solution (but not yet tested here so not sure it will work) is >>>>> to force it to a single thread by setting SINGLE_BEAM in the "number of >>>>> copies". >>>>> More information about this can be found on our documentation pages [1] >>>>> >>>>> Kind regards, >>>>> Hans >>>>> >>>>> [1] >>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html >>>>> >>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <monoji...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2. >>>>>> When I run the pipeline using the local runner, it works fine. But if I >>>>>> run >>>>>> it using the flink runner I get the following error >>>>>> >>>>>> You can only have one copy of the injector transform 'output' to >>>>>> accept the Kafka messages >>>>>> >>>>>> I have tried debugging the Hop code and looks like the root cause is >>>>>> the initSubPipeline() method being invoked multiple times while using the >>>>>> Flink runner. That's not the case when I use the local runner. Am I >>>>>> missing >>>>>> something here? >>>>>> >>>>>> >>>>>> Thanks >>>>>> >>>>>> Monajit Choudhury >>>>>> >>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/> >>>>>> >>>>>