Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-07-14 Thread Kamil Wasilewski
DoFnRunner.java:1335)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>>>>> Source)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>>>>>>>> java.lang.Thread.run(Thread.java:748)\nCaused by:
>>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>>> org/springframework/expression/EvaluationContext\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused
>>>>>>>>>> by: java.lang.ClassNotFoundException:
>>>>>>>>>> org.springframework.expression.EvaluationContext\n\tat
>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat
>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat
>>>>>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat
>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>>>>>>>> Source)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>>>>>>>> Source)\n\tat
>>>>>>>>>> org.apache.beam.fn.harness.Fn

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-07-13 Thread Kamil Wasilewski
Client.java:106)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>>>>>>> java.lang.Thread.run(Thread.java:748)\nCaused by:
>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>> org/springframework/expression/EvaluationContext\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused
>>>>>>>>> by: java.lang.ClassNotFoundException:
>>>>>>>>> org.springframework.expression.EvaluationContext\n\tat
>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat
>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat
>>>>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat
>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>>>>>>> Source)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>>>>>>> Source)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimesta

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-18 Thread Piotr Filipiuk
gt;>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat
>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat
>>>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat
>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat
>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat
>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat
>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>>>>>> Source)\n\tat
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>>>>>> Source)\n\tat
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>>> Source)\n\tat
>>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-18 Thread Chamikara Jayalath
;>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>>>>> Source)\n\tat
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>> Source)\n\tat
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>>>>> java.lang.Thread.run(Thread.java:748)\n"
>>>>>>> log_location:
>>>>>>> "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
>>>>>>>
>>>>>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk <
>>>>>>> piotr.filip...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thank you for the suggestions.
>>>>>>>>
>>>>>>>> Neither Kafka nor Flink run in a docker container, they all run
>>>>>>>> locally. Furthermore, the same issue happens for Direct Runner. That 
>>>>>>>> being
>>>>>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a
>>>>>>>> different error, see attached.
>>>>>>>>
>>>>>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <
>>>>>>>> venkat_pack...@yahoo.com> wrote:
>>>>>>>>
>>>>>>>>> Is Kafka itself running inside another container? If so inspect
>>>>>>>>> that container and see if it has a network alias and add that alias 
>>>>>>>>> to your
>>>>>>>>> /etc/hosts file and map it to 127.0.0.1.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From:* Chamikara Jayalath 
>>>>>>>>> *Sent:* Friday, June 5, 2020 2:58 PM
>>>>>>>>> *To:* Luke Cwik 
>>>>>>>>> *Cc:* user ; dev ;
>>>>>>>>> Heejong Lee 
>>>>>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while
>>>>>>>>> fetching topic metadata
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Is it possible that "'localhost:9092'" is not available from the
>>>>>>>>> Docker environment where the Flink step is executed from ? Can you try
>>>>>>>>> specifying the actual IP address of the node running the Kafka broker 
>>>>>>>>> ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik  wrote:
>>>>>>>>>
>>>>>>>>> +dev  +Chamikara Jayalath
>>>>>>>>>  +Heejong Lee 
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <
>>>>>>>>> piotr.filip...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> I am unable to read from Kafka and getting the following warnings
>>>>>>>>> & errors when calling kafka.ReadFromKafka() (Python SDK):
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> WARNING:root:severity: WARN
>>>>>>>>> timestamp {
>>>>>>>>>   seconds: 1591370012
>>>>>>>>>   nanos: 52300
>>>>>>>>> }
>>>>>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to
>>>>>>>>> node -1 could not be established. Broker may not be available."
>>>>>>>>> log_location: "org.apache.kafka.clients.NetworkClient"
>>>>>>>>> thread: "18"
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Finally the pipeline fails with:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>>>>>>> java.lang.RuntimeException:
>>>>>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>>>>>>>>> fetching topic metadata
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> See more complete log attached.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The relevant code snippet:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>>>>>>>>>
>>>>>>>>> ...
>>>>>>>>>
>>>>>>>>> kafka.ReadFromKafka(
>>>>>>>>> consumer_config=consumer_conf,
>>>>>>>>> topics=[args.topic],
>>>>>>>>>
>>>>>>>>> )
>>>>>>>>>
>>>>>>>>> ...
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Also see full python script attached.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner
>>>>>>>>> I am also not able to read from topic.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am using kafka 2.5.0 and started the broker by following
>>>>>>>>> https://kafka.apache.org/quickstart - using default
>>>>>>>>> config/server.properties.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Everything runs locally, and I verified that I can
>>>>>>>>> publish from that topic using confluent_kafka library.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>> Piotr
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Piotr
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Piotr
>>>>>>>
>>>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>>
>> --
>> Best regards,
>> Piotr
>>
>
>
> --
> Best regards,
> Piotr
>


Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-18 Thread Piotr Filipiuk
rness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>> Source)\n\tat
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>>>> java.lang.Thread.run(Thread.java:748)\n"
>>>>>> log_location:
>>>>>> "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
>>>>>>
>>>>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk <
>>>>>> piotr.filip...@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you for the suggestions.
>>>>>>>
>>>>>>> Neither Kafka nor Flink run in a docker container, they all run
>>>>>>> locally. Furthermore, the same issue happens for Direct Runner. That 
>>>>>>> being
>>>>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a
>>>>>>> different error, see attached.
>>>>>>>
>>>>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <
>>>>>>> venkat_pack...@yahoo.com> wrote:
>>>>>>>
>>>>>>>> Is Kafka itself running inside another container? If so inspect
>>>>>>>> that container and see if it has a network alias and add that alias to 
>>>>>>>> your
>>>>>>>> /etc/hosts file and map it to 127.0.0.1.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From:* Chamikara Jayalath 
>>>>>>>> *Sent:* Friday, June 5, 2020 2:58 PM
>>>>>>>> *To:* Luke Cwik 
>>>>>>>> *Cc:* user ; dev ;
>>>>>>>> Heejong Lee 
>>>>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while
>>>>>>>> fetching topic metadata
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Is it possible that "'localhost:9092'" is not available from the
>>>>>>>> Docker environment where the Flink step is executed from ? Can you try
>>>>>>>> specifying the actual IP address of the node running the Kafka broker ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik  wrote:
>>>>>>>>
>>>>>>>> +dev  +Chamikara Jayalath
>>>>>>>>  +Heejong Lee 
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <
>>>>>>>> piotr.filip...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> I am unable to read from Kafka and getting the following warnings &
>>>>>>>> errors when calling kafka.ReadFromKafka() (Python SDK):
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> WARNING:root:severity: WARN
>>>>>>>> timestamp {
>>>>>>>>   seconds: 1591370012
>>>>>>>>   nanos: 52300
>>>>>>>> }
>>>>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to
>>>>>>>> node -1 could not be established. Broker may not be available."
>>>>>>>> log_location: "org.apache.kafka.clients.NetworkClient"
>>>>>>>> thread: "18"
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Finally the pipeline fails with:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>>>>>> java.lang.RuntimeException:
>>>>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>>>>>>>> fetching topic metadata
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> See more complete log attached.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> The relevant code snippet:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>>>>>>>>
>>>>>>>> ...
>>>>>>>>
>>>>>>>> kafka.ReadFromKafka(
>>>>>>>> consumer_config=consumer_conf,
>>>>>>>> topics=[args.topic],
>>>>>>>>
>>>>>>>> )
>>>>>>>>
>>>>>>>> ...
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Also see full python script attached.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner
>>>>>>>> I am also not able to read from topic.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I am using kafka 2.5.0 and started the broker by following
>>>>>>>> https://kafka.apache.org/quickstart - using default
>>>>>>>> config/server.properties.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Everything runs locally, and I verified that I can
>>>>>>>> publish from that topic using confluent_kafka library.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Piotr
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Piotr
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Piotr
>>>>>>
>>>>>
>>
>> --
>> Best regards,
>> Piotr
>>
>
>
> --
> Best regards,
> Piotr
>


-- 
Best regards,
Piotr


Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-12 Thread Piotr Filipiuk
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>>> java.lang.Thread.run(Thread.java:748)\nCaused by:
>>>> java.lang.NoClassDefFoundError:
>>>> org/springframework/expression/EvaluationContext\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused
>>>> by: java.lang.ClassNotFoundException:
>>>> org.springframework.expression.EvaluationContext\n\tat
>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat
>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat
>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat
>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>> Source)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>> Source)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)\n\tat
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>>> org.ap

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-08 Thread Heejong Lee
cessBundleHandler.java:294)\n\tat
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>> java.lang.Thread.run(Thread.java:748)\n"
>>> log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
>>>
>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk 
>>> wrote:
>>>
>>>> Thank you for the suggestions.
>>>>
>>>> Neither Kafka nor Flink run in a docker container, they all run
>>>> locally. Furthermore, the same issue happens for Direct Runner. That being
>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a
>>>> different error, see attached.
>>>>
>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <
>>>> venkat_pack...@yahoo.com> wrote:
>>>>
>>>>> Is Kafka itself running inside another container? If so inspect that
>>>>> container and see if it has a network alias and add that alias to your
>>>>> /etc/hosts file and map it to 127.0.0.1.
>>>>>
>>>>>
>>>>>
>>>>> *From:* Chamikara Jayalath 
>>>>> *Sent:* Friday, June 5, 2020 2:58 PM
>>>>> *To:* Luke Cwik 
>>>>> *Cc:* user ; dev ; Heejong
>>>>> Lee 
>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while
>>>>> fetching topic metadata
>>>>>
>>>>>
>>>>>
>>>>> Is it possible that "'localhost:9092'" is not available from the
>>>>> Docker environment where the Flink step is executed from ? Can you try
>>>>> specifying the actual IP address of the node running the Kafka broker ?
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik  wrote:
>>>>>
>>>>> +dev  +Chamikara Jayalath  
>>>>> +Heejong
>>>>> Lee 
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <
>>>>> piotr.filip...@gmail.com> wrote:
>>>>>
>>>>> I am unable to read from Kafka and getting the following warnings &
>>>>> errors when calling kafka.ReadFromKafka() (Python SDK):
>>>>>
>>>>>
>>>>>
>>>>> WARNING:root:severity: WARN
>>>>> timestamp {
>>>>>   seconds: 1591370012
>>>>>   nanos: 52300
>>>>> }
>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node
>>>>> -1 could not be established. Broker may not be available."
>>>>> log_location: "org.apache.kafka.clients.NetworkClient"
>>>>> thread: "18"
>>>>>
>>>>>
>>>>>
>>>>> Finally the pipeline fails with:
>>>>>
>>>>>
>>>>>
>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>>> java.lang.RuntimeException:
>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>>>>> fetching topic metadata
>>>>>
>>>>>
>>>>>
>>>>> See more complete log attached.
>>>>>
>>>>>
>>>>>
>>>>> The relevant code snippet:
>>>>>
>>>>>
>>>>>
>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>>>>>
>>>>> ...
>>>>>
>>>>> kafka.ReadFromKafka(
>>>>> consumer_config=consumer_conf,
>>>>> topics=[args.topic],
>>>>>
>>>>> )
>>>>>
>>>>> ...
>>>>>
>>>>>
>>>>>
>>>>> Also see full python script attached.
>>>>>
>>>>>
>>>>>
>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I
>>>>> am also not able to read from topic.
>>>>>
>>>>>
>>>>>
>>>>> I am using kafka 2.5.0 and started the broker by following
>>>>> https://kafka.apache.org/quickstart - using default
>>>>> config/server.properties.
>>>>>
>>>>>
>>>>>
>>>>> Everything runs locally, and I verified that I can
>>>>> publish from that topic using confluent_kafka library.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>


Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-08 Thread Chamikara Jayalath
Filipiuk 
>> wrote:
>>
>>> Thank you for the suggestions.
>>>
>>> Neither Kafka nor Flink run in a docker container, they all run locally.
>>> Furthermore, the same issue happens for Direct Runner. That being said
>>> changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different
>>> error, see attached.
>>>
>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <
>>> venkat_pack...@yahoo.com> wrote:
>>>
>>>> Is Kafka itself running inside another container? If so inspect that
>>>> container and see if it has a network alias and add that alias to your
>>>> /etc/hosts file and map it to 127.0.0.1.
>>>>
>>>>
>>>>
>>>> *From:* Chamikara Jayalath 
>>>> *Sent:* Friday, June 5, 2020 2:58 PM
>>>> *To:* Luke Cwik 
>>>> *Cc:* user ; dev ; Heejong
>>>> Lee 
>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while
>>>> fetching topic metadata
>>>>
>>>>
>>>>
>>>> Is it possible that "'localhost:9092'" is not available from the Docker
>>>> environment where the Flink step is executed from ? Can you try specifying
>>>> the actual IP address of the node running the Kafka broker ?
>>>>
>>>>
>>>>
>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik  wrote:
>>>>
>>>> +dev  +Chamikara Jayalath  
>>>> +Heejong
>>>> Lee 
>>>>
>>>>
>>>>
>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk 
>>>> wrote:
>>>>
>>>> I am unable to read from Kafka and getting the following warnings &
>>>> errors when calling kafka.ReadFromKafka() (Python SDK):
>>>>
>>>>
>>>>
>>>> WARNING:root:severity: WARN
>>>> timestamp {
>>>>   seconds: 1591370012
>>>>   nanos: 52300
>>>> }
>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node
>>>> -1 could not be established. Broker may not be available."
>>>> log_location: "org.apache.kafka.clients.NetworkClient"
>>>> thread: "18"
>>>>
>>>>
>>>>
>>>> Finally the pipeline fails with:
>>>>
>>>>
>>>>
>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>> java.lang.RuntimeException:
>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>>>> fetching topic metadata
>>>>
>>>>
>>>>
>>>> See more complete log attached.
>>>>
>>>>
>>>>
>>>> The relevant code snippet:
>>>>
>>>>
>>>>
>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>>>>
>>>> ...
>>>>
>>>> kafka.ReadFromKafka(
>>>> consumer_config=consumer_conf,
>>>> topics=[args.topic],
>>>>
>>>> )
>>>>
>>>> ...
>>>>
>>>>
>>>>
>>>> Also see full python script attached.
>>>>
>>>>
>>>>
>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
>>>> also not able to read from topic.
>>>>
>>>>
>>>>
>>>> I am using kafka 2.5.0 and started the broker by following
>>>> https://kafka.apache.org/quickstart - using default
>>>> config/server.properties.
>>>>
>>>>
>>>>
>>>> Everything runs locally, and I verified that I can publish from
>>>> that topic using confluent_kafka library.
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Best regards,
>>>> Piotr
>>>>
>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>>
>> --
>> Best regards,
>> Piotr
>>
>


Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-08 Thread Chamikara Jayalath
g.ClassLoader.loadClass(ClassLoader.java:419)\n\tat
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat
> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:478)\n\tat
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)\n\tat
> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
> Source)\n\tat
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)\n\tat
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
> java.lang.Thread.run(Thread.java:748)\n"
> log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
>
> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk 
> wrote:
>
>> Thank you for the suggestions.
>>
>> Neither Kafka nor Flink run in a docker container, they all run locally.
>> Furthermore, the same issue happens for Direct Runner. That being said
>> changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different
>> error, see attached.
>>
>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <
>> venkat_pack...@yahoo.com> wrote:
>>
>>> Is Kafka itself running inside another container? If so inspect that
>>> container and see if it has a network alias and add that alias to your
>>> /etc/hosts file and map it to 127.0.0.1.
>>>
>>>
>>>

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-08 Thread Piotr Filipiuk
eam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\n"
log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"

On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk 
wrote:

> Thank you for the suggestions.
>
> Neither Kafka nor Flink run in a docker container, they all run locally.
> Furthermore, the same issue happens for Direct Runner. That being said
> changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different
> error, see attached.
>
> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy 
> wrote:
>
>> Is Kafka itself running inside another container? If so inspect that
>> container and see if it has a network alias and add that alias to your
>> /etc/hosts file and map it to 127.0.0.1.
>>
>>
>>
>> *From:* Chamikara Jayalath 
>> *Sent:* Friday, June 5, 2020 2:58 PM
>> *To:* Luke Cwik 
>> *Cc:* user ; dev ; Heejong
>> Lee 
>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while fetching
>> topic metadata
>>
>>
>>
>> Is it possible that "'localhost:9092'" is not available from the Docker
>> environment where the Flink step is executed from ? Can you try specifying
>> the actual IP address of the node running the Kafka broker ?
>>
>>
>>
>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik  wrote:
>>
>> +dev  +Chamikara Jayalath  
>> +Heejong
>> Lee 
>>
>>
>>
>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk 
>> wrote:
>>
>> I am unable to read from Kafka and getting the following warnings &
>

Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-05 Thread Piotr Filipiuk
Thank you for the suggestions.

Neither Kafka nor Flink run in a docker container, they all run locally.
Furthermore, the same issue happens for Direct Runner. That being said
changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different
error, see attached.

On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy 
wrote:

> Is Kafka itself running inside another container? If so inspect that
> container and see if it has a network alias and add that alias to your
> /etc/hosts file and map it to 127.0.0.1.
>
>
>
> *From:* Chamikara Jayalath 
> *Sent:* Friday, June 5, 2020 2:58 PM
> *To:* Luke Cwik 
> *Cc:* user ; dev ; Heejong Lee
> 
> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while fetching
> topic metadata
>
>
>
> Is it possible that "'localhost:9092'" is not available from the Docker
> environment where the Flink step is executed from ? Can you try specifying
> the actual IP address of the node running the Kafka broker ?
>
>
>
> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik  wrote:
>
> +dev  +Chamikara Jayalath  
> +Heejong
> Lee 
>
>
>
> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk 
> wrote:
>
> I am unable to read from Kafka and getting the following warnings & errors
> when calling kafka.ReadFromKafka() (Python SDK):
>
>
>
> WARNING:root:severity: WARN
> timestamp {
>   seconds: 1591370012
>   nanos: 52300
> }
> message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1
> could not be established. Broker may not be available."
> log_location: "org.apache.kafka.clients.NetworkClient"
> thread: "18"
>
>
>
> Finally the pipeline fails with:
>
>
>
> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
> java.lang.RuntimeException:
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
>
>
>
> See more complete log attached.
>
>
>
> The relevant code snippet:
>
>
>
> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>
> ...
>
> kafka.ReadFromKafka(
> consumer_config=consumer_conf,
> topics=[args.topic],
>
> )
>
> ...
>
>
>
> Also see full python script attached.
>
>
>
> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
> also not able to read from topic.
>
>
>
> I am using kafka 2.5.0 and started the broker by following
> https://kafka.apache.org/quickstart - using default
> config/server.properties.
>
>
>
> Everything runs locally, and I verified that I can publish from
> that topic using confluent_kafka library.
>
>
>
> --
>
> Best regards,
> Piotr
>
>

-- 
Best regards,
Piotr


beam01.log
Description: Binary data


RE: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-05 Thread Venkat Muthuswamy
Is Kafka itself running inside another container? If so inspect that container 
and see if it has a network alias and add that alias to your /etc/hosts file 
and map it to 127.0.0.1. 

 

From: Chamikara Jayalath  
Sent: Friday, June 5, 2020 2:58 PM
To: Luke Cwik 
Cc: user ; dev ; Heejong Lee 

Subject: Re: Python SDK ReadFromKafka: Timeout expired while fetching topic 
metadata

 

Is it possible that "'localhost:9092'" is not available from the Docker 
environment where the Flink step is executed from ? Can you try specifying the 
actual IP address of the node running the Kafka broker ?

 

On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik mailto:lc...@google.com> > wrote:

+dev <mailto:d...@beam.apache.org>  +Chamikara Jayalath 
<mailto:chamik...@google.com>  +Heejong Lee <mailto:heej...@google.com>  

 

On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk mailto:piotr.filip...@gmail.com> > wrote:

I am unable to read from Kafka and getting the following warnings & errors when 
calling kafka.ReadFromKafka() (Python SDK):

 

WARNING:root:severity: WARN
timestamp {
  seconds: 1591370012
  nanos: 52300
}
message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 could 
not be established. Broker may not be available."
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "18"

 

Finally the pipeline fails with:

 

RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: 
Timeout expired while fetching topic metadata

 

See more complete log attached.

 

The relevant code snippet:

 

consumer_conf = {"bootstrap.servers": 'localhost:9092'}

...

kafka.ReadFromKafka(
consumer_config=consumer_conf,
topics=[args.topic],

)

...

 

Also see full python script attached.

 

I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am also 
not able to read from topic.

 

I am using kafka 2.5.0 and started the broker by following 
https://kafka.apache.org/quickstart - using default config/server.properties.


 

Everything runs locally, and I verified that I can publish from that 
topic using confluent_kafka library.

 

-- 

Best regards,
Piotr



Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-05 Thread Chamikara Jayalath
Is it possible that "'localhost:9092'" is not available from the Docker
environment where the Flink step is executed from ? Can you try specifying
the actual IP address of the node running the Kafka broker ?

On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik  wrote:

> +dev  +Chamikara Jayalath  
> +Heejong
> Lee 
>
> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk 
> wrote:
>
>> I am unable to read from Kafka and getting the following warnings &
>> errors when calling kafka.ReadFromKafka() (Python SDK):
>>
>> WARNING:root:severity: WARN
>> timestamp {
>>   seconds: 1591370012
>>   nanos: 52300
>> }
>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1
>> could not be established. Broker may not be available."
>> log_location: "org.apache.kafka.clients.NetworkClient"
>> thread: "18"
>>
>> Finally the pipeline fails with:
>>
>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>> java.lang.RuntimeException:
>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>> fetching topic metadata
>>
>> See more complete log attached.
>>
>> The relevant code snippet:
>>
>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>> ...
>> kafka.ReadFromKafka(
>> consumer_config=consumer_conf,
>> topics=[args.topic],
>> )
>> ...
>>
>> Also see full python script attached.
>>
>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
>> also not able to read from topic.
>>
>> I am using kafka 2.5.0 and started the broker by following
>> https://kafka.apache.org/quickstart - using default
>> config/server.properties.
>>
>> Everything runs locally, and I verified that I can publish from
>> that topic using confluent_kafka library.
>>
>> --
>> Best regards,
>> Piotr
>>
>


Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-05 Thread Luke Cwik
+dev  +Chamikara Jayalath  +Heejong
Lee 

On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk 
wrote:

> I am unable to read from Kafka and getting the following warnings & errors
> when calling kafka.ReadFromKafka() (Python SDK):
>
> WARNING:root:severity: WARN
> timestamp {
>   seconds: 1591370012
>   nanos: 52300
> }
> message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1
> could not be established. Broker may not be available."
> log_location: "org.apache.kafka.clients.NetworkClient"
> thread: "18"
>
> Finally the pipeline fails with:
>
> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
> java.lang.RuntimeException:
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
>
> See more complete log attached.
>
> The relevant code snippet:
>
> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
> ...
> kafka.ReadFromKafka(
> consumer_config=consumer_conf,
> topics=[args.topic],
> )
> ...
>
> Also see full python script attached.
>
> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
> also not able to read from topic.
>
> I am using kafka 2.5.0 and started the broker by following
> https://kafka.apache.org/quickstart - using default
> config/server.properties.
>
> Everything runs locally, and I verified that I can publish from
> that topic using confluent_kafka library.
>
> --
> Best regards,
> Piotr
>


Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-05 Thread Piotr Filipiuk
I am unable to read from Kafka and getting the following warnings & errors
when calling kafka.ReadFromKafka() (Python SDK):

WARNING:root:severity: WARN
timestamp {
  seconds: 1591370012
  nanos: 52300
}
message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1
could not be established. Broker may not be available."
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "18"

Finally the pipeline fails with:

RuntimeError: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata

See more complete log attached.

The relevant code snippet:

consumer_conf = {"bootstrap.servers": 'localhost:9092'}
...
kafka.ReadFromKafka(
consumer_config=consumer_conf,
topics=[args.topic],
)
...

Also see full python script attached.

I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
also not able to read from topic.

I am using kafka 2.5.0 and started the broker by following
https://kafka.apache.org/quickstart - using default
config/server.properties.

Everything runs locally, and I verified that I can publish from
that topic using confluent_kafka library.

-- 
Best regards,
Piotr


beam.log
Description: Binary data
"""
Example:

# DirectRunner
python pipeline.py --bootstrap_servers=localhost:9092 --topic=inputs

# FlinkRunner
python batch.py --bootstrap_servers=localhost:9092 --topic=inputs \
  --runner=FlinkRunner --flink_version=1.9 \
  --flink_master=localhost:8081 --environment_type=LOOPBACK

"""
import argparse
import logging

import apache_beam as beam
from apache_beam.io.external import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions


def format_results(elem):
(msg, sum_value) = elem
return f"message: {msg}, sum: {sum_value}"

def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the pipeline."""
parser = argparse.ArgumentParser()

parser.add_argument("--bootstrap_servers", type=str,
help="Kafka Broker address")
parser.add_argument("--topic", type=str, help="Kafka topic to read from")
parser.add_argument("--output", type=str, default="/tmp/kafka-output",
help="Output filepath")

args, pipeline_args = parser.parse_known_args(argv)

if args.topic is None or args.bootstrap_servers is None:
parser.print_usage()
print(f"{sys.argv[0]}: error: both --topic and --bootstrap_servers are required")
sys.exit(1)

options = PipelineOptions(pipeline_args)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options.view_as(SetupOptions).save_main_session = save_main_session

# Enforce that this pipeline is always run in streaming mode
options.view_as(StandardOptions).streaming = True

consumer_conf = {"bootstrap.servers": args.bootstrap_servers}
print(f"Starting pipeline "
  f"kafka = {args.bootstrap_servers}, topic = {args.topic}")
with beam.Pipeline(options=options) as p:
events = (
p
| "ReadFromKafka" >> kafka.ReadFromKafka(
consumer_config=consumer_conf,
topics=[args.topic],
)
| "WindowIntoFixedWindows" >> beam.WindowInto(
beam.window.FixedWindows(60))
| "AddOnes" >> beam.Map(lambda msg: (msg, 1))
| "SumByKey" >> beam.CombinePerKey(sum)
| "FormatResults" >> beam.Map(format_results)
| "WriteUserScoreSums" >> beam.io.WriteToText(args.output)
)

if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()