I also wanted to clarify whether Kafka IO for Python SDK is general availability or is it still experimental?
On Fri, Jun 12, 2020 at 2:52 PM Piotr Filipiuk <[email protected]> wrote: > For completeness I am also attaching task manager logs. > > On Fri, Jun 12, 2020 at 2:32 PM Piotr Filipiuk <[email protected]> > wrote: > >> Thank you for clarifying. >> >> I attempted to use FlinkRunner with 2.22 and I am getting the following >> error, which I am not sure how to debug: >> >> ERROR:root:java.lang.UnsupportedOperationException: The ActiveBundle does >> not have a registered bundle checkpoint handler. >> INFO:apache_beam.runners.portability.portable_runner:Job state changed to >> FAILED >> Starting pipeline kafka = 192.168.1.219:9092, topic = piotr-test >> Traceback (most recent call last): >> File "apache_beam/examples/streaming_wordcount_kafka.py", line 73, in >> <module> >> run() >> File "apache_beam/examples/streaming_wordcount_kafka.py", line 68, in >> run >> | "WriteUserScoreSums" >> beam.io.WriteToText(args.output) >> File >> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 547, in __exit__ >> self.run().wait_until_finish() >> File >> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", >> line 583, in wait_until_finish >> raise self._runtime_exception >> RuntimeError: Pipeline >> BeamApp-piotr0filipiuk-0612212344-159c632b_45b947f8-bc64-4aad-a96c-6fb2bd834d60 >> failed in state FAILED: java.lang.UnsupportedOperationException: The >> ActiveBundle does not have a registered bundle checkpoint handler. >> >> My setup is (everything runs locally): >> Beam Version: 2.22.0. >> Kafka 2.5.0 (https://kafka.apache.org/quickstart - using default >> config/server.properties) >> Flink 1.10 ( >> https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/tutorials/local_setup.html >> ) >> >> I run the pipeline using the following command: >> >> python apache_beam/examples/streaming_wordcount_kafka.py >> --bootstrap_servers=192.168.1.219:9092 --topic=piotr-test >> --runner=FlinkRunner --flink_version=1.10 --flink_master= >> 192.168.1.219:8081 --environment_type=LOOPBACK >> >> I can see the following error in the logs: >> >> ERROR:apache_beam.runners.worker.data_plane:Failed to read inputs in the >> data plane. >> Traceback (most recent call last): >> File >> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", >> line 528, in _read_inputs >> for elements in elements_iterator: >> File >> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >> line 416, in __next__ >> return self._next() >> File >> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >> line 689, in _next >> raise self >> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC >> that terminated with: >> status = StatusCode.UNAVAILABLE >> details = "DNS resolution failed" >> debug_error_string = >> "{"created":"@1591997030.613849000","description":"Failed to pick >> subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3941,"referenced_errors":[{"created":"@1591997030.613847000","description":"Resolver >> transient >> failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":262,"referenced_errors":[{"created":"@1591997030.613847000","description":"DNS >> resolution >> failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":370,"grpc_status":14,"referenced_errors":[{"created":"@1591997030.613840000","description":"C-ares >> status is not ARES_SUCCESS: Misformatted domain >> name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244,"referenced_errors":[{"created":"@1591997030.613728000","description":"C-ares >> status is not ARES_SUCCESS: Misformatted domain >> name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244}]}]}]}]}" >> > >> >> Which I thought might be a culprit, however it also happens when running >> the wordcount.py example >> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py> >> that succeeds. That error appears only for Flink 1.10, not for Flink 1.9. >> >> Full log attached. >> >> I would appreciate help and suggestions on how to proceed. >> >> >> On Mon, Jun 8, 2020 at 5:49 PM Heejong Lee <[email protected]> wrote: >> >>> DirectRunner is not well-tested for xlang transforms and you need to >>> specify jar_packages experimental flag for Java dependencies from Python >>> SDK. I'd recommend using 2.22 + FlinkRunner for xlang pipelines. >>> >>> On Mon, Jun 8, 2020 at 3:27 PM Chamikara Jayalath <[email protected]> >>> wrote: >>> >>>> To clarify, Kafka dependency was already available as an embedded >>>> dependency in Java SDK Harness but not sure if this worked for >>>> DirectRunner. starting 2.22 we'll be staging dependencies from the >>>> environment during pipeline submission. >>>> >>>> On Mon, Jun 8, 2020 at 3:23 PM Chamikara Jayalath <[email protected]> >>>> wrote: >>>> >>>>> Seems like Java dependency is not being properly set up when running >>>>> the cross-language Kafka step. I don't think this was available for Beam >>>>> 2.21. Can you try with the latest Beam HEAD or Beam 2.22 when it's >>>>> released ? >>>>> +Heejong Lee <[email protected]> >>>>> >>>>> On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk < >>>>> [email protected]> wrote: >>>>> >>>>>> Pasting the error inline: >>>>>> >>>>>> ERROR:root:severity: ERROR >>>>>> timestamp { >>>>>> seconds: 1591405163 >>>>>> nanos: 815000000 >>>>>> } >>>>>> message: "Client failed to dequeue and process the value" >>>>>> trace: "org.apache.beam.sdk.util.UserCodeException: >>>>>> java.lang.NoClassDefFoundError: >>>>>> org/springframework/expression/EvaluationContext\n\tat >>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat >>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>>>> Source)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>>>> Source)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>> Source)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>>>> java.lang.Thread.run(Thread.java:748)\nCaused by: >>>>>> java.lang.NoClassDefFoundError: >>>>>> org/springframework/expression/EvaluationContext\n\tat >>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat >>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused >>>>>> by: java.lang.ClassNotFoundException: >>>>>> org.springframework.expression.EvaluationContext\n\tat >>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat >>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat >>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat >>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat >>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat >>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat >>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>>>> Source)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>>>> Source)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>> Source)\n\tat >>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>>>> java.lang.Thread.run(Thread.java:748)\n" >>>>>> log_location: >>>>>> "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" >>>>>> >>>>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Thank you for the suggestions. >>>>>>> >>>>>>> Neither Kafka nor Flink run in a docker container, they all run >>>>>>> locally. Furthermore, the same issue happens for Direct Runner. That >>>>>>> being >>>>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a >>>>>>> different error, see attached. >>>>>>> >>>>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Is Kafka itself running inside another container? If so inspect >>>>>>>> that container and see if it has a network alias and add that alias to >>>>>>>> your >>>>>>>> /etc/hosts file and map it to 127.0.0.1. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *From:* Chamikara Jayalath <[email protected]> >>>>>>>> *Sent:* Friday, June 5, 2020 2:58 PM >>>>>>>> *To:* Luke Cwik <[email protected]> >>>>>>>> *Cc:* user <[email protected]>; dev <[email protected]>; >>>>>>>> Heejong Lee <[email protected]> >>>>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while >>>>>>>> fetching topic metadata >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Is it possible that "'localhost:9092'" is not available from the >>>>>>>> Docker environment where the Flink step is executed from ? Can you try >>>>>>>> specifying the actual IP address of the node running the Kafka broker ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <[email protected]> wrote: >>>>>>>> >>>>>>>> +dev <[email protected]> +Chamikara Jayalath >>>>>>>> <[email protected]> +Heejong Lee <[email protected]> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>> I am unable to read from Kafka and getting the following warnings & >>>>>>>> errors when calling kafka.ReadFromKafka() (Python SDK): >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> WARNING:root:severity: WARN >>>>>>>> timestamp { >>>>>>>> seconds: 1591370012 >>>>>>>> nanos: 523000000 >>>>>>>> } >>>>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to >>>>>>>> node -1 could not be established. Broker may not be available." >>>>>>>> log_location: "org.apache.kafka.clients.NetworkClient" >>>>>>>> thread: "18" >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Finally the pipeline fails with: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>>>>>> java.lang.RuntimeException: >>>>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >>>>>>>> fetching topic metadata >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> See more complete log attached. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> The relevant code snippet: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >>>>>>>> >>>>>>>> ... >>>>>>>> >>>>>>>> kafka.ReadFromKafka( >>>>>>>> consumer_config=consumer_conf, >>>>>>>> topics=[args.topic], >>>>>>>> >>>>>>>> ) >>>>>>>> >>>>>>>> ... >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Also see full python script attached. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner >>>>>>>> I am also not able to read from topic. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I am using kafka 2.5.0 and started the broker by following >>>>>>>> https://kafka.apache.org/quickstart - using default >>>>>>>> config/server.properties. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Everything runs locally, and I verified that I can >>>>>>>> publish&consume from that topic using confluent_kafka library. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Piotr >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best regards, >>>>>>> Piotr >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards, >>>>>> Piotr >>>>>> >>>>> >> >> -- >> Best regards, >> Piotr >> > > > -- > Best regards, > Piotr > -- Best regards, Piotr
