After syncing to:
commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master,
origin/master, origin/HEAD)
Author: Ning Kang <[email protected]>
Date: Fri Apr 24 10:58:07 2020 -0700
The new error is:
RuntimeError: java.lang.IllegalArgumentException: unable to deserialize
Custom DoFn With Execution Info
at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
at
org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697)
at
org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.<init>(FnApiDoFnRunner.java:356)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException:
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class
incompatible: stream classdesc serialVersionUID = 7311199418509482705,
local class serialVersionUID = 5488866827627794770
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
... 18 more
I am not sure it is related to
https://issues.apache.org/jira/browse/BEAM-9745.
On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk <[email protected]>
wrote:
> Here is an error I am getting when using DirectRunner:
>
> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for the
> bundle bundle_1 to finish.
> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951
> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
> Requests sent by runner: [('bundle_1', 1)]
> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
> Requests multiplexing info: []
> Traceback (most recent call last):
> File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
> 193, in _run_module_as_main
> "__main__", mod_spec)
> File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
> 85, in _run_code
> exec(code, run_globals)
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
> line 74, in <module>
> run()
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
> line 69, in run
> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, in
> __exit__
> self.run().wait_until_finish()
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, in
> run
> return self.runner.run_pipeline(self, self._options)
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
> line 130, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 173, in run_pipeline
> pipeline.to_runner_api(default_environment=self._default_environment))
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 183, in run_via_runner_api
> return self.run_stages(stage_context, stages)
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 331, in run_stages
> bundle_context_manager,
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 508, in _run_stage
> bundle_manager)
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 546, in _run_bundle
> data_input, data_output, input_timers, expected_timer_output)
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 927, in process_bundle
> timer_inputs)):
> File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
> line 586, in result_iterator
> yield fs.pop().result()
> File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
> line 432, in result
> return self.__get_result()
> File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
> line 384, in __get_result
> raise self._exception
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/utils/thread_pool_executor.py",
> line 44, in run
> self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 923, in execute
> dry_run)
> File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 862, in process_bundle
> raise RuntimeError(result.error)
> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
> java.lang.NoClassDefFoundError:
> org/springframework/expression/EvaluationContext
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
> at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505)
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
> Source)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
> at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
> at
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
> at
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
> at
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)
> at
> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoClassDefFoundError:
> org/springframework/expression/EvaluationContext
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466)
> Caused by: java.lang.ClassNotFoundException:
> org.springframework.expression.EvaluationContext
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466)
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
> at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505)
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
> Source)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
> at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
> at
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
> at
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
> at
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)
> at
> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> On Wed, Apr 22, 2020 at 2:22 PM Kyle Weaver <[email protected]> wrote:
>
>> It should just work without any other changes. If it doesn't let us know.
>>
>> On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> Thx. Once the docker image is build, how do I make sure it is used when
>>> I run Beam pipeline, as opposed to pulling from docker hub?
>>>
>>> On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]> wrote:
>>>
>>>> You can build the Java SDK image from source by running the following
>>>> command: ./gradlew :sdks:java:container:docker
>>>>
>>>> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <
>>>> [email protected]> wrote:
>>>>
>>>>> Thanks for quick response.
>>>>>
>>>>> Since Beam 2.21.0 is not yet available via pip
>>>>> <https://pypi.org/project/apache-beam/#history>, I tried to run it
>>>>> from HEAD. After creating fresh virtual environment, my steps were:
>>>>> pip install -r build-requirements.txt
>>>>> python setup.py build
>>>>> python setup.py install
>>>>>
>>>>> Then I encounter the following error when running the script from the
>>>>> original message:
>>>>>
>>>>> Error response from daemon: manifest for apache/beam_java_sdk:
>>>>> 2.21.0.dev not found: manifest unknown: manifest unknown
>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable
>>>>> to pull image apache/beam_java_sdk:2.21.0.dev
>>>>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally
>>>>> docker: Error response from daemon: manifest for apache/beam_java_sdk:
>>>>> 2.21.0.dev not found: manifest unknown: manifest unknown.
>>>>> See 'docker run --help'.
>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>>> Requests sent by runner: []
>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>>> Requests multiplexing info: []
>>>>> Traceback (most recent call last):
>>>>> File
>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>>>> 193, in _run_module_as_main
>>>>> "__main__", mod_spec)
>>>>> File
>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>>>> 85, in _run_code
>>>>> exec(code, run_globals)
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>>> line 74, in <module>
>>>>> run()
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>>> line 69, in run
>>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line
>>>>> 528, in __exit__
>>>>> self.run().wait_until_finish()
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line
>>>>> 514, in run
>>>>> return self.runner.run_pipeline(self, self._options)
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>>>>> line 130, in run_pipeline
>>>>> return runner.run_pipeline(pipeline, options)
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>> line 173, in run_pipeline
>>>>>
>>>>> pipeline.to_runner_api(default_environment=self._default_environment))
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>> line 183, in run_via_runner_api
>>>>> return self.run_stages(stage_context, stages)
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>> line 331, in run_stages
>>>>> bundle_context_manager,
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>> line 470, in _run_stage
>>>>> bundle_context_manager.extract_bundle_inputs_and_outputs())
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>> line 530, in extract_bundle_inputs_and_outputs
>>>>> data_api_service_descriptor = self.data_api_service_descriptor()
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>> line 451, in data_api_service_descriptor
>>>>> return self.worker_handlers[0].data_api_service_descriptor()
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>> line 445, in worker_handlers
>>>>> self.stage.environment, self.num_workers))
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>>>> line 854, in get_worker_handlers
>>>>> worker_handler.start_worker()
>>>>> File "/Users/piotr.filipiuk/src/
>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>>>> line 734, in start_worker
>>>>> '--provision_endpoint=%s' % self.control_address,
>>>>> File
>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>>>> line 336, in check_output
>>>>> **kwargs).stdout
>>>>> File
>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>>>> line 418, in run
>>>>> output=stdout, stderr=stderr)
>>>>> subprocess.CalledProcessError: Command '['docker', 'run', '-d',
>>>>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', '--id=worker_1',
>>>>> '--logging_endpoint=host.docker.internal:50453',
>>>>> '--control_endpoint=host.docker.internal:50450',
>>>>> '--artifact_endpoint=host.docker.internal:50450',
>>>>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero exit
>>>>> status 125.
>>>>>
>>>>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly available,
>>>>> I cannot pull the image. Is it possible to get access to dev images?
>>>>> Alternatively, are there any instructions on how to build the
>>>>> beam_java_sdk
>>>>> locally and then use the local image when running a job?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I would like to know whether it is possible to run a streaming
>>>>>>> pipeline that reads from (or writes to) Kafka using DirectRunner? If so,
>>>>>>> what should the expansion_service point to:
>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90
>>>>>>> ?
>>>>>>
>>>>>>
>>>>>>> Also, when using FlinkRunner, what should be the value of
>>>>>>> expansion_service? Based on
>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
>>>>>>> Flink should start expansion service on port 8097. I am running Flink
>>>>>>> using
>>>>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution
>>>>>>> and the expansion service is not being started. Same holds when using
>>>>>>> docker image: https://hub.docker.com/_/flink. I cannot find
>>>>>>> expansion service being mentioned in Flink Documentation - unless it is
>>>>>>> called by different name.
>>>>>>>
>>>>>>
>>>>>> After Beam 2.21.0 you should not have to specify anything. You only
>>>>>> need to make sure that Java is installed in the system (so that command
>>>>>> 'java' is available) and kafka.py will download the correct jar.
>>>>>>
>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62
>>>>>>
>>>>>> For HEAD (today), as long as you are in a Beam repo clone, you should
>>>>>> not have to specify anything as well since kafka.py will try to build the
>>>>>> dependency locally.
>>>>>>
>>>>>> For previous versions, you have to start up an expansion service
>>>>>> manually. I'm not familiar about the expansion service embedded in Flink
>>>>>> Job Server.
>>>>>>
>>>>>>
>>>>>>> Flink Version: flink-1.9.2
>>>>>>> Apache-beam Version: 2.19.0
>>>>>>>
>>>>>>> Full Details:
>>>>>>>
>>>>>>> The pipeline looks as follows:
>>>>>>>
>>>>>>> ```
>>>>>>> import argparse
>>>>>>> import logging
>>>>>>>
>>>>>>> import apache_beam as beam
>>>>>>> from apache_beam.io.external import kafka
>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>> from apache_beam.options.pipeline_options import SetupOptions
>>>>>>> from apache_beam.options.pipeline_options import StandardOptions
>>>>>>>
>>>>>>>
>>>>>>> def format_results(elem):
>>>>>>> (msg, sum_value) = elem
>>>>>>> print(type(msg))
>>>>>>> print(type(sum_value))
>>>>>>> return f"message: {msg}, sum: {sum_value}"
>>>>>>>
>>>>>>> def run(argv=None, save_main_session=True):
>>>>>>> """Main entry point; defines and runs the pipeline."""
>>>>>>> parser = argparse.ArgumentParser()
>>>>>>>
>>>>>>> parser.add_argument('--bootstrap_servers', type=str,
>>>>>>> help='Kafka Broker address')
>>>>>>> parser.add_argument('--topic', type=str, help='Kafka topic to
>>>>>>> read from')
>>>>>>> parser.add_argument('--output', type=str,
>>>>>>> default="/tmp/kafka-output",
>>>>>>> help='Output filepath')
>>>>>>>
>>>>>>> args, pipeline_args = parser.parse_known_args(argv)
>>>>>>>
>>>>>>> if args.topic is None or args.bootstrap_servers is None:
>>>>>>> parser.print_usage()
>>>>>>> print(sys.argv[0] + ': error: both --topic and
>>>>>>> --bootstrap_servers are required')
>>>>>>> sys.exit(1)
>>>>>>>
>>>>>>> options = PipelineOptions(pipeline_args)
>>>>>>> # We use the save_main_session option because one or more DoFn's
>>>>>>> in this
>>>>>>> # workflow rely on global context (e.g., a module imported at
>>>>>>> module level).
>>>>>>> options.view_as(SetupOptions).save_main_session =
>>>>>>> save_main_session
>>>>>>>
>>>>>>> # Enforce that this pipeline is always run in streaming mode
>>>>>>> options.view_as(StandardOptions).streaming = True
>>>>>>>
>>>>>>> consumer_conf = {'bootstrap.servers': args.bootstrap_servers}
>>>>>>>
>>>>>>> with beam.Pipeline(options=options) as p:
>>>>>>> events = (
>>>>>>> p
>>>>>>> | "ReadFromKafka" >> kafka.ReadFromKafka(
>>>>>>> consumer_config=consumer_conf,
>>>>>>> topics=[args.topic],
>>>>>>> )
>>>>>>> | "WindowIntoFixedWindows" >> beam.WindowInto(
>>>>>>> beam.window.FixedWindows(60))
>>>>>>> | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
>>>>>>> | "SumByKey" >> beam.CombinePerKey(sum)
>>>>>>> | 'FormatResults' >> beam.Map(format_results)
>>>>>>> | 'WriteUserScoreSums' >>
>>>>>>> beam.io.WriteToText(args.output)
>>>>>>> )
>>>>>>>
>>>>>>> if __name__ == '__main__':
>>>>>>> logging.getLogger().setLevel(logging.DEBUG)
>>>>>>> run()
>>>>>>> ```
>>>>>>>
>>>>>>> I run it using:
>>>>>>>
>>>>>>> ```
>>>>>>> python streaming.py --topic=inputs --bootstrap_servers==
>>>>>>> 192.168.1.219:32779
>>>>>>> ```
>>>>>>>
>>>>>>> Kafka brokers are available under the --bootstrap_servers and the
>>>>>>> --topic exists.
>>>>>>>
>>>>>>> The error I am getting:
>>>>>>>
>>>>>>> ```
>>>>>>> Traceback (most recent call last):
>>>>>>> File "streaming.py", line 74, in <module>
>>>>>>> run()
>>>>>>> File "streaming.py", line 69, in run
>>>>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>>> File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>>>> line 989, in __ror__
>>>>>>> return self.transform.__ror__(pvalueish, self.label)
>>>>>>> File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>>>> line 549, in __ror__
>>>>>>> result = p.apply(self, pvalueish, label)
>>>>>>> File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>>> line 536, in apply
>>>>>>> return self.apply(transform, pvalueish)
>>>>>>> File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>>> line 577, in apply
>>>>>>> pvalueish_result = self.runner.apply(transform, pvalueish,
>>>>>>> self._options)
>>>>>>> File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>>>> line 195, in apply
>>>>>>> return m(transform, input, options)
>>>>>>> File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>>>> line 225, in apply_PTransform
>>>>>>> return transform.expand(input)
>>>>>>> File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
>>>>>>> line 327, in expand
>>>>>>> channel).Expand(request)
>>>>>>> File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>>> line 826, in __call__
>>>>>>> return _end_unary_response_blocking(state, call, False, None)
>>>>>>> File
>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>>> line 729, in _end_unary_response_blocking
>>>>>>> raise _InactiveRpcError(state)
>>>>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
>>>>>>> terminated with:
>>>>>>> status = StatusCode.UNAVAILABLE
>>>>>>> details = "Trying to connect an http1.x server"
>>>>>>> debug_error_string =
>>>>>>> "{"created":"@1587496938.983698000","description":"Error received from
>>>>>>> peer
>>>>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
>>>>>>> to connect an http1.x server","grpc_status":14}"
>>>>>>> ```
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>
--
Best regards,
Piotr