Hi Mono, I tested against the kafka image from bitnami ( https://hub.docker.com/r/bitnami/kafka/) which uses kafka 3.1 on their latest tag and it worked. That being said it seems Beam runst tests against kafka 1.0.0 up to 2.5.1 ( https://github.com/apache/beam/blob/880b10e71b963de6ec20efe614dd866e9a809da4/sdks/java/io/kafka/build.gradle )
It does seem we add the 1.0.0 lib to our release which is a really old version, I know Matt is currently working on a Beam upgrade for our next release so I'll add it to the list to get this sorted out. It will be safe to replace the kafka-clients jar located under plugins/engines/beam/lib with the same jar as from the kafka transform located under plugins/transforms/kafka, or even update both to 2.5.1. Kind regards, Hans On Mon, 25 Apr 2022 at 18:14, monajit choudhury <monoji...@gmail.com> wrote: > Hi Hans, > > Thanks a lot for the guidance. I was able to run it on Flink but looks > like there's a issue with the Kafka Consumer > > Caused by: org.apache.beam.sdk.util.UserCodeException: > java.lang.NoSuchMethodError: > org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecord > > > On analyzing the Fat jar I found that the version of the KafkaConsumer is > < 2.x whereas the plugins folder has 2.4.1, which is the version the fat > jar should include. > > > Looks like Beam is using an older version of the kafka consumer > > > > Thanks > > Mono > > > > > On Sat, Apr 23, 2022 at 6:01 AM Hans Van Akelyen < > hans.van.akel...@gmail.com> wrote: > >> Hi Mono, >> >> I took a bit of time to set up a test environment on my local system >> because we can not always by heart if something actually works (we are >> working on more tests in combination with spark/flink/dataflow). >> But I can confirm it works with a Flink runner. I do agree that error >> handling is not ideal, it gets stuck in a waiting loop when the kafka >> server is unavailable. The Flink job then never gets published to the >> cluster and you sit there wondering what's going on. When everything is >> configured correctly it works as expected. >> >> I created a sample pipeline using the Beam Kafka consumer and a write to >> text file to see if the data is being received in the correct format. >> >> Pipeline: >> >> Screenshot 2022-04-23 at 14.55.06.png >> <https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web> >> >> Flink console output: >> >> Screenshot 2022-04-23 at 14.47.34.png >> <https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web> >> >> Settings I used on the Beam run configuration: >> >> Screenshot 2022-04-23 at 14.53.30.png >> <https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web> >> >> >> Hope you get everything working. >> If there is anything more I can do please let me know. >> >> Kr, >> Hans >> >> On Sat, 23 Apr 2022 at 05:02, monajit choudhury <monoji...@gmail.com> >> wrote: >> >>> Hi Hans, >>> >>> Yeah I realized that apart from AVRO it supports string messages too. >>> But the issue is the beam consumer doesn't consume any messages from kafka >>> . Even if put garbage in the topic name, it doesn't throw any errors. >>> The Java docs says that its only mean to be run with beam runners, does >>> it include the Flink runner ? >>> >>> Apart from that everything works like a charm and we even managed to >>> write some custom plugins for our usecases. If we can solve this kafka >>> consumer issue, then we are all set for prime time. >>> >>> Really appreciate your responses so far. >>> >>> Thanks >>> Mono >>> >>> On Fri, Apr 22, 2022, 15:49 Matt Casters <matt.cast...@neo4j.com> wrote: >>> >>>> The Beam Kafka Consumer obviously accepts JSON messages as strings. >>>> >>>> >>>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <monoji...@gmail.com >>>> >: >>>> >>>>> Hi Hans, >>>>> >>>>> Going through the log files I realized it had something to do with >>>>> multithreaded executions. I tried using the Beam Kafka Consumer but the >>>>> issue is it only supports AVRO. I need to consume json messages >>>>> >>>>> Thanks >>>>> Mono >>>>> >>>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen < >>>>> hans.van.akel...@gmail.com> wrote: >>>>> >>>>>> Hi Monajit, >>>>>> >>>>>> This is the auto scaling nature of Flink fighting against the >>>>>> requirement of having a single threaded pipeline for Kafka messages (as >>>>>> we >>>>>> need to know when messages are finished. When running on Flink the best >>>>>> solution would be to use the Beam Kafka Consumer. >>>>>> >>>>>> Another solution (but not yet tested here so not sure it will work) >>>>>> is to force it to a single thread by setting SINGLE_BEAM in the "number >>>>>> of >>>>>> copies". >>>>>> More information about this can be found on our documentation pages >>>>>> [1] >>>>>> >>>>>> Kind regards, >>>>>> Hans >>>>>> >>>>>> [1] >>>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html >>>>>> >>>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <monoji...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2. >>>>>>> When I run the pipeline using the local runner, it works fine. But if I >>>>>>> run >>>>>>> it using the flink runner I get the following error >>>>>>> >>>>>>> You can only have one copy of the injector transform 'output' to >>>>>>> accept the Kafka messages >>>>>>> >>>>>>> I have tried debugging the Hop code and looks like the root cause is >>>>>>> the initSubPipeline() method being invoked multiple times while using >>>>>>> the >>>>>>> Flink runner. That's not the case when I use the local runner. Am I >>>>>>> missing >>>>>>> something here? >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> Monajit Choudhury >>>>>>> >>>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/> >>>>>>> >>>>>>