After syncing to:

commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master,
origin/master, origin/HEAD)
Author: Ning Kang <[email protected]>
Date:   Fri Apr 24 10:58:07 2020 -0700

The new error is:

RuntimeError: java.lang.IllegalArgumentException: unable to deserialize
Custom DoFn With Execution Info
        at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
        at
org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697)
        at
org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360)
        at
org.apache.beam.fn.harness.FnApiDoFnRunner.<init>(FnApiDoFnRunner.java:356)
        at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165)
        at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141)
        at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233)
        at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
        at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
        at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
        at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474)
        at
org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271)
        at
org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534)
        at
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266)
        at
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
        at
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException:
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class
incompatible: stream classdesc serialVersionUID = 7311199418509482705,
local class serialVersionUID = 5488866827627794770
        at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
        at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
        ... 18 more

I am not sure it is related to
https://issues.apache.org/jira/browse/BEAM-9745.

On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk <[email protected]>
wrote:

> Here is an error I am getting when using DirectRunner:
>
> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for the
> bundle bundle_1 to finish.
> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951
> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
> Requests sent by runner: [('bundle_1', 1)]
> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
> Requests multiplexing info: []
> Traceback (most recent call last):
>   File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
> 193, in _run_module_as_main
>     "__main__", mod_spec)
>   File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
> 85, in _run_code
>     exec(code, run_globals)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
> line 74, in <module>
>     run()
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
> line 69, in run
>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, in
> __exit__
>     self.run().wait_until_finish()
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, in
> run
>     return self.runner.run_pipeline(self, self._options)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
> line 130, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 173, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 183, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 331, in run_stages
>     bundle_context_manager,
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 508, in _run_stage
>     bundle_manager)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 546, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 927, in process_bundle
>     timer_inputs)):
>   File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
> line 586, in result_iterator
>     yield fs.pop().result()
>   File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
> line 432, in result
>     return self.__get_result()
>   File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
> line 384, in __get_result
>     raise self._exception
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/utils/thread_pool_executor.py",
> line 44, in run
>     self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 923, in execute
>     dry_run)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 862, in process_bundle
>     raise RuntimeError(result.error)
> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
> java.lang.NoClassDefFoundError:
> org/springframework/expression/EvaluationContext
>         at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>         at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>         at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>         at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505)
>         at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
> Source)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>         at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>         at
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>         at
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>         at
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)
>         at
> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
>         at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291)
>         at
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>         at
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoClassDefFoundError:
> org/springframework/expression/EvaluationContext
>         at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)
>         at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)
>         at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)
>         at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466)
> Caused by: java.lang.ClassNotFoundException:
> org.springframework.expression.EvaluationContext
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>         at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)
>         at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)
>         at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)
>         at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466)
>         at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>         at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>         at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505)
>         at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
> Source)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>         at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>         at
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>         at
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122)
>         at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>         at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>         at
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)
>         at
> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
>         at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291)
>         at
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>         at
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
>
> On Wed, Apr 22, 2020 at 2:22 PM Kyle Weaver <[email protected]> wrote:
>
>> It should just work without any other changes. If it doesn't let us know.
>>
>> On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> Thx. Once the docker image is build, how do I make sure it is used when
>>> I run Beam pipeline, as opposed to pulling from docker hub?
>>>
>>> On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]> wrote:
>>>
>>>> You can build the Java SDK image from source by running the following
>>>> command: ./gradlew :sdks:java:container:docker
>>>>
>>>> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <
>>>> [email protected]> wrote:
>>>>
>>>>> Thanks for quick response.
>>>>>
>>>>> Since Beam 2.21.0 is not yet available via pip
>>>>> <https://pypi.org/project/apache-beam/#history>, I tried to run it
>>>>> from HEAD. After creating fresh virtual environment, my steps were:
>>>>> pip install -r build-requirements.txt
>>>>> python setup.py build
>>>>> python setup.py install
>>>>>
>>>>> Then I encounter the following error when running the script from the
>>>>> original message:
>>>>>
>>>>> Error response from daemon: manifest for apache/beam_java_sdk:
>>>>> 2.21.0.dev not found: manifest unknown: manifest unknown
>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable
>>>>> to pull image apache/beam_java_sdk:2.21.0.dev
>>>>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally
>>>>> docker: Error response from daemon: manifest for apache/beam_java_sdk:
>>>>> 2.21.0.dev not found: manifest unknown: manifest unknown.
>>>>> See 'docker run --help'.
>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>>> Requests sent by runner: []
>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>>> Requests multiplexing info: []
>>>>> Traceback (most recent call last):
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>>>> 193, in _run_module_as_main
>>>>>     "__main__", mod_spec)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>>>> 85, in _run_code
>>>>>     exec(code, run_globals)
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>>> line 74, in <module>
>>>>>     run()
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>>> line 69, in run
>>>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line
>>>>> 528, in __exit__
>>>>>     self.run().wait_until_finish()
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line
>>>>> 514, in run
>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>>>>> line 130, in run_pipeline
>>>>>     return runner.run_pipeline(pipeline, options)
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>> line 173, in run_pipeline
>>>>>
>>>>> pipeline.to_runner_api(default_environment=self._default_environment))
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>> line 183, in run_via_runner_api
>>>>>     return self.run_stages(stage_context, stages)
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>> line 331, in run_stages
>>>>>     bundle_context_manager,
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>> line 470, in _run_stage
>>>>>     bundle_context_manager.extract_bundle_inputs_and_outputs())
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>> line 530, in extract_bundle_inputs_and_outputs
>>>>>     data_api_service_descriptor = self.data_api_service_descriptor()
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>> line 451, in data_api_service_descriptor
>>>>>     return self.worker_handlers[0].data_api_service_descriptor()
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>> line 445, in worker_handlers
>>>>>     self.stage.environment, self.num_workers))
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>>>> line 854, in get_worker_handlers
>>>>>     worker_handler.start_worker()
>>>>>   File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>>>> line 734, in start_worker
>>>>>     '--provision_endpoint=%s' % self.control_address,
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>>>> line 336, in check_output
>>>>>     **kwargs).stdout
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>>>> line 418, in run
>>>>>     output=stdout, stderr=stderr)
>>>>> subprocess.CalledProcessError: Command '['docker', 'run', '-d',
>>>>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', '--id=worker_1',
>>>>> '--logging_endpoint=host.docker.internal:50453',
>>>>> '--control_endpoint=host.docker.internal:50450',
>>>>> '--artifact_endpoint=host.docker.internal:50450',
>>>>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero exit
>>>>> status 125.
>>>>>
>>>>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly available,
>>>>> I cannot pull the image. Is it possible to get access to dev images?
>>>>> Alternatively, are there any instructions on how to build the 
>>>>> beam_java_sdk
>>>>> locally and then use the local image when running a job?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I would like to know whether it is possible to run a streaming
>>>>>>> pipeline that reads from (or writes to) Kafka using DirectRunner? If so,
>>>>>>> what should the expansion_service point to:
>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90
>>>>>>> ?
>>>>>>
>>>>>>
>>>>>>> Also, when using FlinkRunner, what should be the value of
>>>>>>> expansion_service? Based on
>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
>>>>>>> Flink should start expansion service on port 8097. I am running Flink 
>>>>>>> using
>>>>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution
>>>>>>> and the expansion service is not being started. Same holds when using
>>>>>>> docker image: https://hub.docker.com/_/flink. I cannot find
>>>>>>> expansion service being mentioned in Flink Documentation - unless it is
>>>>>>> called by different name.
>>>>>>>
>>>>>>
>>>>>> After Beam 2.21.0 you should not have to specify anything. You only
>>>>>> need to make sure that Java is installed in the system (so that command
>>>>>> 'java' is available) and kafka.py will download the correct jar.
>>>>>>
>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62
>>>>>>
>>>>>> For HEAD (today), as long as you are in a Beam repo clone, you should
>>>>>> not have to specify anything as well since kafka.py will try to build the
>>>>>> dependency locally.
>>>>>>
>>>>>> For previous versions, you have to start up an expansion service
>>>>>> manually. I'm not familiar about the expansion service embedded in Flink
>>>>>> Job Server.
>>>>>>
>>>>>>
>>>>>>> Flink Version: flink-1.9.2
>>>>>>> Apache-beam Version: 2.19.0
>>>>>>>
>>>>>>> Full Details:
>>>>>>>
>>>>>>> The pipeline looks as follows:
>>>>>>>
>>>>>>> ```
>>>>>>> import argparse
>>>>>>> import logging
>>>>>>>
>>>>>>> import apache_beam as beam
>>>>>>> from apache_beam.io.external import kafka
>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>> from apache_beam.options.pipeline_options import SetupOptions
>>>>>>> from apache_beam.options.pipeline_options import StandardOptions
>>>>>>>
>>>>>>>
>>>>>>> def format_results(elem):
>>>>>>>     (msg, sum_value) = elem
>>>>>>>     print(type(msg))
>>>>>>>     print(type(sum_value))
>>>>>>>     return f"message: {msg}, sum: {sum_value}"
>>>>>>>
>>>>>>> def run(argv=None, save_main_session=True):
>>>>>>>     """Main entry point; defines and runs the pipeline."""
>>>>>>>     parser = argparse.ArgumentParser()
>>>>>>>
>>>>>>>     parser.add_argument('--bootstrap_servers', type=str,
>>>>>>>                         help='Kafka Broker address')
>>>>>>>     parser.add_argument('--topic', type=str, help='Kafka topic to
>>>>>>> read from')
>>>>>>>     parser.add_argument('--output', type=str,
>>>>>>> default="/tmp/kafka-output",
>>>>>>>                         help='Output filepath')
>>>>>>>
>>>>>>>     args, pipeline_args = parser.parse_known_args(argv)
>>>>>>>
>>>>>>>     if args.topic is None or args.bootstrap_servers is None:
>>>>>>>         parser.print_usage()
>>>>>>>         print(sys.argv[0] + ': error: both --topic and
>>>>>>> --bootstrap_servers are required')
>>>>>>>         sys.exit(1)
>>>>>>>
>>>>>>>     options = PipelineOptions(pipeline_args)
>>>>>>>     # We use the save_main_session option because one or more DoFn's
>>>>>>> in this
>>>>>>>     # workflow rely on global context (e.g., a module imported at
>>>>>>> module level).
>>>>>>>     options.view_as(SetupOptions).save_main_session =
>>>>>>> save_main_session
>>>>>>>
>>>>>>>     # Enforce that this pipeline is always run in streaming mode
>>>>>>>     options.view_as(StandardOptions).streaming = True
>>>>>>>
>>>>>>>     consumer_conf = {'bootstrap.servers': args.bootstrap_servers}
>>>>>>>
>>>>>>>     with beam.Pipeline(options=options) as p:
>>>>>>>         events = (
>>>>>>>             p
>>>>>>>             | "ReadFromKafka" >> kafka.ReadFromKafka(
>>>>>>>                 consumer_config=consumer_conf,
>>>>>>>                 topics=[args.topic],
>>>>>>>             )
>>>>>>>             | "WindowIntoFixedWindows" >> beam.WindowInto(
>>>>>>>                 beam.window.FixedWindows(60))
>>>>>>>             | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
>>>>>>>             | "SumByKey" >> beam.CombinePerKey(sum)
>>>>>>>             | 'FormatResults' >> beam.Map(format_results)
>>>>>>>             | 'WriteUserScoreSums' >>
>>>>>>> beam.io.WriteToText(args.output)
>>>>>>>         )
>>>>>>>
>>>>>>> if __name__ == '__main__':
>>>>>>>     logging.getLogger().setLevel(logging.DEBUG)
>>>>>>>     run()
>>>>>>> ```
>>>>>>>
>>>>>>> I run it using:
>>>>>>>
>>>>>>> ```
>>>>>>> python streaming.py --topic=inputs --bootstrap_servers==
>>>>>>> 192.168.1.219:32779
>>>>>>> ```
>>>>>>>
>>>>>>> Kafka brokers are available under the --bootstrap_servers and the
>>>>>>> --topic exists.
>>>>>>>
>>>>>>> The error I am getting:
>>>>>>>
>>>>>>> ```
>>>>>>> Traceback (most recent call last):
>>>>>>>   File "streaming.py", line 74, in <module>
>>>>>>>     run()
>>>>>>>   File "streaming.py", line 69, in run
>>>>>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>>>> line 989, in __ror__
>>>>>>>     return self.transform.__ror__(pvalueish, self.label)
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>>>> line 549, in __ror__
>>>>>>>     result = p.apply(self, pvalueish, label)
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>>> line 536, in apply
>>>>>>>     return self.apply(transform, pvalueish)
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>>> line 577, in apply
>>>>>>>     pvalueish_result = self.runner.apply(transform, pvalueish,
>>>>>>> self._options)
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>>>> line 195, in apply
>>>>>>>     return m(transform, input, options)
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>>>> line 225, in apply_PTransform
>>>>>>>     return transform.expand(input)
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
>>>>>>> line 327, in expand
>>>>>>>     channel).Expand(request)
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>>> line 826, in __call__
>>>>>>>     return _end_unary_response_blocking(state, call, False, None)
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>>> line 729, in _end_unary_response_blocking
>>>>>>>     raise _InactiveRpcError(state)
>>>>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
>>>>>>> terminated with:
>>>>>>>         status = StatusCode.UNAVAILABLE
>>>>>>>         details = "Trying to connect an http1.x server"
>>>>>>>         debug_error_string =
>>>>>>> "{"created":"@1587496938.983698000","description":"Error received from 
>>>>>>> peer
>>>>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
>>>>>>> to connect an http1.x server","grpc_status":14}"
>>>>>>> ```
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>


-- 
Best regards,
Piotr

Reply via email to