I'd like to bump this thread up since I get the same error when trying to
read from Kafka in Python SDK:

*java.lang.UnsupportedOperationException: The ActiveBundle does not have a
registered bundle checkpoint handler.*

Can someone familiar with cross-language and Flink verify the problem? I
use the latest Beam master with the following pipeline options:

--runner=FlinkRunner
--parallelism=2
--experiment=beam_fn_api
--environment_type=DOCKER
--environment_cache_millis=10000

Those are the same options which are used in CrossLanguageKafkaIOTest:
https://github.com/apache/beam/blob/master/sdks/python/test-suites/portable/common.gradle#L114
Speaking of which, is there a specific reason why reading from Kafka is not
yet being tested by Jenkins at the moment?

Thanks,
Kamil

On Thu, Jun 18, 2020 at 11:35 PM Piotr Filipiuk <[email protected]>
wrote:

> Thank you for clarifying.
>
> Would you mind clarifying whether the issues that I experience running
> Kafka IO on Flink (or DirectRunner for testing) specific to my setup etc.
> or this setup is not yet fully functional (for Python SDK)?
>
> On Thu, Jun 18, 2020 at 12:03 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> Beam does not have a concept of general availability. It's released with
>> Beam so available. Some of the APIs used by Kafka are experimental so are
>> subject to change (but less likely at this point).
>> Various runners may offer their own levels of availability for
>> cross-language transforms.
>>
>> Thanks,
>> Cham
>>
>>
>> On Thu, Jun 18, 2020 at 11:26 AM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> I also wanted to clarify whether Kafka IO for Python SDK is general
>>> availability or is it still experimental?
>>>
>>> On Fri, Jun 12, 2020 at 2:52 PM Piotr Filipiuk <[email protected]>
>>> wrote:
>>>
>>>> For completeness I am also attaching task manager logs.
>>>>
>>>> On Fri, Jun 12, 2020 at 2:32 PM Piotr Filipiuk <
>>>> [email protected]> wrote:
>>>>
>>>>> Thank you for clarifying.
>>>>>
>>>>> I attempted to use FlinkRunner with 2.22 and I am
>>>>> getting the following error, which I am not sure how to debug:
>>>>>
>>>>> ERROR:root:java.lang.UnsupportedOperationException: The ActiveBundle
>>>>> does not have a registered bundle checkpoint handler.
>>>>> INFO:apache_beam.runners.portability.portable_runner:Job state changed
>>>>> to FAILED
>>>>> Starting pipeline kafka = 192.168.1.219:9092, topic = piotr-test
>>>>> Traceback (most recent call last):
>>>>>   File "apache_beam/examples/streaming_wordcount_kafka.py", line 73,
>>>>> in <module>
>>>>>     run()
>>>>>   File "apache_beam/examples/streaming_wordcount_kafka.py", line 68,
>>>>> in run
>>>>>     | "WriteUserScoreSums" >> beam.io.WriteToText(args.output)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 547, in __exit__
>>>>>     self.run().wait_until_finish()
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>>> line 583, in wait_until_finish
>>>>>     raise self._runtime_exception
>>>>> RuntimeError: Pipeline
>>>>> BeamApp-piotr0filipiuk-0612212344-159c632b_45b947f8-bc64-4aad-a96c-6fb2bd834d60
>>>>> failed in state FAILED: java.lang.UnsupportedOperationException: The
>>>>> ActiveBundle does not have a registered bundle checkpoint handler.
>>>>>
>>>>> My setup is (everything runs locally):
>>>>> Beam Version: 2.22.0.
>>>>> Kafka 2.5.0 (https://kafka.apache.org/quickstart - using default
>>>>> config/server.properties)
>>>>> Flink 1.10 (
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/tutorials/local_setup.html
>>>>> )
>>>>>
>>>>> I run the pipeline using the following command:
>>>>>
>>>>> python apache_beam/examples/streaming_wordcount_kafka.py
>>>>> --bootstrap_servers=192.168.1.219:9092 --topic=piotr-test
>>>>> --runner=FlinkRunner --flink_version=1.10 --flink_master=
>>>>> 192.168.1.219:8081 --environment_type=LOOPBACK
>>>>>
>>>>> I can see the following error in the logs:
>>>>>
>>>>> ERROR:apache_beam.runners.worker.data_plane:Failed to read inputs in
>>>>> the data plane.
>>>>> Traceback (most recent call last):
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
>>>>> line 528, in _read_inputs
>>>>>     for elements in elements_iterator:
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>> line 416, in __next__
>>>>>     return self._next()
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>> line 689, in _next
>>>>>     raise self
>>>>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of
>>>>> RPC that terminated with:
>>>>>         status = StatusCode.UNAVAILABLE
>>>>>         details = "DNS resolution failed"
>>>>>         debug_error_string =
>>>>> "{"created":"@1591997030.613849000","description":"Failed to pick
>>>>> subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3941,"referenced_errors":[{"created":"@1591997030.613847000","description":"Resolver
>>>>> transient
>>>>> failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":262,"referenced_errors":[{"created":"@1591997030.613847000","description":"DNS
>>>>> resolution
>>>>> failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":370,"grpc_status":14,"referenced_errors":[{"created":"@1591997030.613840000","description":"C-ares
>>>>> status is not ARES_SUCCESS: Misformatted domain
>>>>> name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244,"referenced_errors":[{"created":"@1591997030.613728000","description":"C-ares
>>>>> status is not ARES_SUCCESS: Misformatted domain
>>>>> name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244}]}]}]}]}"
>>>>> >
>>>>>
>>>>> Which I thought might be a culprit, however it also happens when
>>>>> running the wordcount.py example
>>>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py>
>>>>> that succeeds. That error appears only for Flink 1.10, not for Flink 1.9.
>>>>>
>>>>> Full log attached.
>>>>>
>>>>> I would appreciate help and suggestions on how to proceed.
>>>>>
>>>>>
>>>>> On Mon, Jun 8, 2020 at 5:49 PM Heejong Lee <[email protected]> wrote:
>>>>>
>>>>>> DirectRunner is not well-tested for xlang transforms and you need to
>>>>>> specify jar_packages experimental flag for Java dependencies from Python
>>>>>> SDK. I'd recommend using 2.22 + FlinkRunner for xlang pipelines.
>>>>>>
>>>>>> On Mon, Jun 8, 2020 at 3:27 PM Chamikara Jayalath <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> To clarify, Kafka dependency was already available as an embedded
>>>>>>> dependency in Java SDK Harness but not sure if this worked for
>>>>>>> DirectRunner. starting 2.22 we'll be staging dependencies from the
>>>>>>> environment during pipeline submission.
>>>>>>>
>>>>>>> On Mon, Jun 8, 2020 at 3:23 PM Chamikara Jayalath <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Seems like Java dependency is not being properly set up when
>>>>>>>> running the cross-language Kafka step. I don't think this was 
>>>>>>>> available for
>>>>>>>> Beam 2.21. Can you try with the latest Beam HEAD or Beam 2.22 when it's
>>>>>>>> released ?
>>>>>>>> +Heejong Lee <[email protected]>
>>>>>>>>
>>>>>>>> On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Pasting the error inline:
>>>>>>>>>
>>>>>>>>> ERROR:root:severity: ERROR
>>>>>>>>> timestamp {
>>>>>>>>>   seconds: 1591405163
>>>>>>>>>   nanos: 815000000
>>>>>>>>> }
>>>>>>>>> message: "Client failed to dequeue and process the value"
>>>>>>>>> trace: "org.apache.beam.sdk.util.UserCodeException:
>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>> org/springframework/expression/EvaluationContext\n\tat
>>>>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>>>>>>> Source)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>>>>>>> Source)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>>>> Source)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>>>>>>> java.lang.Thread.run(Thread.java:748)\nCaused by:
>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>> org/springframework/expression/EvaluationContext\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused
>>>>>>>>> by: java.lang.ClassNotFoundException:
>>>>>>>>> org.springframework.expression.EvaluationContext\n\tat
>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat
>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat
>>>>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat
>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>>>>>>> Source)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>>>>>>> Source)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>>>> Source)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>>>>>>> java.lang.Thread.run(Thread.java:748)\n"
>>>>>>>>> log_location:
>>>>>>>>> "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
>>>>>>>>>
>>>>>>>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you for the suggestions.
>>>>>>>>>>
>>>>>>>>>> Neither Kafka nor Flink run in a docker container, they all run
>>>>>>>>>> locally. Furthermore, the same issue happens for Direct Runner. That 
>>>>>>>>>> being
>>>>>>>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a
>>>>>>>>>> different error, see attached.
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Is Kafka itself running inside another container? If so inspect
>>>>>>>>>>> that container and see if it has a network alias and add that alias 
>>>>>>>>>>> to your
>>>>>>>>>>> /etc/hosts file and map it to 127.0.0.1.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *From:* Chamikara Jayalath <[email protected]>
>>>>>>>>>>> *Sent:* Friday, June 5, 2020 2:58 PM
>>>>>>>>>>> *To:* Luke Cwik <[email protected]>
>>>>>>>>>>> *Cc:* user <[email protected]>; dev <[email protected]>;
>>>>>>>>>>> Heejong Lee <[email protected]>
>>>>>>>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while
>>>>>>>>>>> fetching topic metadata
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Is it possible that "'localhost:9092'" is not available from the
>>>>>>>>>>> Docker environment where the Flink step is executed from ? Can you 
>>>>>>>>>>> try
>>>>>>>>>>> specifying the actual IP address of the node running the Kafka 
>>>>>>>>>>> broker ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> +dev <[email protected]> +Chamikara Jayalath
>>>>>>>>>>> <[email protected]> +Heejong Lee <[email protected]>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>> I am unable to read from Kafka and getting the following
>>>>>>>>>>> warnings & errors when calling kafka.ReadFromKafka() (Python SDK):
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> WARNING:root:severity: WARN
>>>>>>>>>>> timestamp {
>>>>>>>>>>>   seconds: 1591370012
>>>>>>>>>>>   nanos: 523000000
>>>>>>>>>>> }
>>>>>>>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to
>>>>>>>>>>> node -1 could not be established. Broker may not be available."
>>>>>>>>>>> log_location: "org.apache.kafka.clients.NetworkClient"
>>>>>>>>>>> thread: "18"
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Finally the pipeline fails with:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>>>>>>>>> java.lang.RuntimeException:
>>>>>>>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired 
>>>>>>>>>>> while
>>>>>>>>>>> fetching topic metadata
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> See more complete log attached.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The relevant code snippet:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>>>>>>>>>>>
>>>>>>>>>>> ...
>>>>>>>>>>>
>>>>>>>>>>> kafka.ReadFromKafka(
>>>>>>>>>>>                 consumer_config=consumer_conf,
>>>>>>>>>>>                 topics=[args.topic],
>>>>>>>>>>>
>>>>>>>>>>> )
>>>>>>>>>>>
>>>>>>>>>>> ...
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Also see full python script attached.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink
>>>>>>>>>>> Runner I am also not able to read from topic.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I am using kafka 2.5.0 and started the broker by following
>>>>>>>>>>> https://kafka.apache.org/quickstart - using default
>>>>>>>>>>> config/server.properties.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Everything runs locally, and I verified that I can
>>>>>>>>>>> publish&consume from that topic using confluent_kafka library.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Piotr
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards,
>>>>>>>>>> Piotr
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards,
>>>>>>>>> Piotr
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>

Reply via email to