Hi Mono,

I tested against the kafka image from bitnami (
https://hub.docker.com/r/bitnami/kafka/) which uses kafka 3.1 on their
latest tag and it worked.
That being said it seems Beam runst tests against kafka 1.0.0 up to 2.5.1 (
https://github.com/apache/beam/blob/880b10e71b963de6ec20efe614dd866e9a809da4/sdks/java/io/kafka/build.gradle
)

It does seem we add the 1.0.0 lib to our release which is a really old
version, I know Matt is currently working on a Beam upgrade for our next
release so I'll add it to the list to get this sorted out.

It will be safe to replace the kafka-clients jar located under
plugins/engines/beam/lib with the same jar as from the kafka transform
located under plugins/transforms/kafka, or even update both to 2.5.1.

Kind regards,
Hans

On Mon, 25 Apr 2022 at 18:14, monajit choudhury <monoji...@gmail.com> wrote:

> Hi Hans,
>
> Thanks a lot for the guidance. I was able to run it on Flink but looks
> like there's a issue with the Kafka Consumer
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> java.lang.NoSuchMethodError:
> org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecord
>
>
> On analyzing the Fat jar I found that the version of the KafkaConsumer is
> < 2.x whereas the plugins folder has 2.4.1, which is the version the fat
> jar should include.
>
>
> Looks like Beam is using an older version of the kafka consumer
>
>
>
> Thanks
>
> Mono
>
>
>
>
> On Sat, Apr 23, 2022 at 6:01 AM Hans Van Akelyen <
> hans.van.akel...@gmail.com> wrote:
>
>> Hi Mono,
>>
>> I took a bit of time to set up a test environment on my local system
>> because we can not always by heart if something actually works (we are
>> working on more tests in combination with spark/flink/dataflow).
>> But I can confirm it works with a Flink runner. I do agree that error
>> handling is not ideal, it gets stuck in a waiting loop when the kafka
>> server is unavailable. The Flink job then never gets published to the
>> cluster and you sit there wondering what's going on. When everything is
>> configured correctly it works as expected.
>>
>> I created a sample pipeline using the Beam Kafka consumer and a write to
>> text file to see if the data is being received in the correct format.
>>
>> Pipeline:
>>
>>  Screenshot 2022-04-23 at 14.55.06.png
>> <https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web>
>>
>> Flink console output:
>>
>>  Screenshot 2022-04-23 at 14.47.34.png
>> <https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web>
>>
>> Settings I used on the Beam run configuration:
>>
>>  Screenshot 2022-04-23 at 14.53.30.png
>> <https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web>
>>
>>
>> Hope you get everything working.
>> If there is anything more I can do please let me know.
>>
>> Kr,
>> Hans
>>
>> On Sat, 23 Apr 2022 at 05:02, monajit choudhury <monoji...@gmail.com>
>> wrote:
>>
>>> Hi Hans,
>>>
>>> Yeah I realized that apart from AVRO it supports string messages too.
>>> But the issue is the  beam consumer doesn't consume any messages from kafka
>>> . Even if put garbage in the topic name, it doesn't throw any errors.
>>>  The Java docs says that its only mean to be run with beam runners, does
>>> it include the Flink runner ?
>>>
>>> Apart from that everything works like a charm and we even managed to
>>> write some custom plugins for our usecases. If we can solve this kafka
>>> consumer issue,  then we are all set for prime time.
>>>
>>> Really appreciate your responses so far.
>>>
>>> Thanks
>>> Mono
>>>
>>> On Fri, Apr 22, 2022, 15:49 Matt Casters <matt.cast...@neo4j.com> wrote:
>>>
>>>> The Beam Kafka Consumer obviously accepts JSON messages as strings.
>>>>
>>>>
>>>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <monoji...@gmail.com
>>>> >:
>>>>
>>>>> Hi Hans,
>>>>>
>>>>> Going through the log files I realized it had something to do with
>>>>> multithreaded executions. I tried using the  Beam Kafka Consumer but the
>>>>> issue is it only supports AVRO. I need to consume json messages
>>>>>
>>>>> Thanks
>>>>> Mono
>>>>>
>>>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
>>>>> hans.van.akel...@gmail.com> wrote:
>>>>>
>>>>>> Hi Monajit,
>>>>>>
>>>>>> This is the auto scaling nature of Flink fighting against the
>>>>>> requirement of having a single threaded pipeline for Kafka messages (as 
>>>>>> we
>>>>>> need to know when messages are finished. When running on Flink the best
>>>>>> solution would be to use the Beam Kafka Consumer.
>>>>>>
>>>>>> Another solution (but not yet tested here so not sure it will work)
>>>>>> is to force it to a single thread by setting SINGLE_BEAM in the "number 
>>>>>> of
>>>>>> copies".
>>>>>> More information about this can be found on our documentation pages
>>>>>> [1]
>>>>>>
>>>>>> Kind regards,
>>>>>> Hans
>>>>>>
>>>>>> [1]
>>>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>>>>>
>>>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <monoji...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2.
>>>>>>> When I run the pipeline using the local runner, it works fine. But if I 
>>>>>>> run
>>>>>>> it using the flink runner I get the following error
>>>>>>>
>>>>>>> You can only have one copy of the injector transform 'output' to
>>>>>>> accept the Kafka messages
>>>>>>>
>>>>>>> I have tried debugging the Hop code and looks like the root cause is
>>>>>>> the initSubPipeline() method being invoked multiple times while using 
>>>>>>> the
>>>>>>> Flink runner. That's not the case when I use the local runner. Am I 
>>>>>>> missing
>>>>>>> something here?
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> Monajit Choudhury
>>>>>>>
>>>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>>>>>
>>>>>>

Reply via email to