Java dependencies are not yet fully propagated over the expansion service,
which might be what you're running into. I'm actually in the process of
putting together a PR to fix this; I'll let you know when it's ready.

On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver <[email protected]> wrote:

> I'm not sure about the org.springframework.expression.EvaluationContext
> issue, but "local class incompatible" usually happens when using Beam
> components built from different sources. Make sure to rebuild everything
> from the same commit.
>
> On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk <[email protected]>
> wrote:
>
>> After syncing to:
>>
>> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master,
>> origin/master, origin/HEAD)
>> Author: Ning Kang <[email protected]>
>> Date:   Fri Apr 24 10:58:07 2020 -0700
>>
>> The new error is:
>>
>> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize
>> Custom DoFn With Execution Info
>>         at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>>         at
>> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697)
>>         at
>> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360)
>>         at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.<init>(FnApiDoFnRunner.java:356)
>>         at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165)
>>         at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141)
>>         at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233)
>>         at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>         at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>         at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>         at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474)
>>         at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271)
>>         at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534)
>>         at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266)
>>         at
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>>         at
>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.InvalidClassException:
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class
>> incompatible: stream classdesc serialVersionUID = 7311199418509482705,
>> local class serialVersionUID = 5488866827627794770
>>         at
>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>>         at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>>         at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>>         at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>         at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
>>         at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
>>         at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>         at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>>         at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>>         at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
>>         ... 18 more
>>
>> I am not sure it is related to
>> https://issues.apache.org/jira/browse/BEAM-9745.
>>
>> On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> Here is an error I am getting when using DirectRunner:
>>>
>>> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for
>>> the bundle bundle_1 to finish.
>>> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951
>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>> Requests sent by runner: [('bundle_1', 1)]
>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>> Requests multiplexing info: []
>>> Traceback (most recent call last):
>>>   File
>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>> 193, in _run_module_as_main
>>>     "__main__", mod_spec)
>>>   File
>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>> 85, in _run_code
>>>     exec(code, run_globals)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>> line 74, in <module>
>>>     run()
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>> line 69, in run
>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528,
>>> in __exit__
>>>     self.run().wait_until_finish()
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514,
>>> in run
>>>     return self.runner.run_pipeline(self, self._options)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>>> line 130, in run_pipeline
>>>     return runner.run_pipeline(pipeline, options)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 173, in run_pipeline
>>>
>>> pipeline.to_runner_api(default_environment=self._default_environment))
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 183, in run_via_runner_api
>>>     return self.run_stages(stage_context, stages)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 331, in run_stages
>>>     bundle_context_manager,
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 508, in _run_stage
>>>     bundle_manager)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 546, in _run_bundle
>>>     data_input, data_output, input_timers, expected_timer_output)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 927, in process_bundle
>>>     timer_inputs)):
>>>   File
>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
>>> line 586, in result_iterator
>>>     yield fs.pop().result()
>>>   File
>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
>>> line 432, in result
>>>     return self.__get_result()
>>>   File
>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
>>> line 384, in __get_result
>>>     raise self._exception
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/utils/thread_pool_executor.py",
>>> line 44, in run
>>>     self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 923, in execute
>>>     dry_run)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 862, in process_bundle
>>>     raise RuntimeError(result.error)
>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>> java.lang.NoClassDefFoundError:
>>> org/springframework/expression/EvaluationContext
>>>         at
>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>         at
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>> Source)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>>>         at
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>>>         at
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505)
>>>         at
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>> Source)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>>>         at
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>>>         at
>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>>         at
>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>         at
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)
>>>         at
>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
>>>         at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291)
>>>         at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>>>         at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.NoClassDefFoundError:
>>> org/springframework/expression/EvaluationContext
>>>         at
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)
>>>         at
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)
>>>         at
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)
>>>         at
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466)
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.springframework.expression.EvaluationContext
>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>         at
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)
>>>         at
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)
>>>         at
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)
>>>         at
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466)
>>>         at
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>> Source)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>>>         at
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>>>         at
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505)
>>>         at
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>> Source)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>>>         at
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>>>         at
>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>>         at
>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>         at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>         at
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)
>>>         at
>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
>>>         at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291)
>>>         at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>>>         at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>         at java.lang.Thread.run(Thread.java:748)
>>>
>>> On Wed, Apr 22, 2020 at 2:22 PM Kyle Weaver <[email protected]> wrote:
>>>
>>>> It should just work without any other changes. If it doesn't let us
>>>> know.
>>>>
>>>> On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk <
>>>> [email protected]> wrote:
>>>>
>>>>> Thx. Once the docker image is build, how do I make sure it is used
>>>>> when I run Beam pipeline, as opposed to pulling from docker hub?
>>>>>
>>>>> On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> You can build the Java SDK image from source by running the following
>>>>>> command: ./gradlew :sdks:java:container:docker
>>>>>>
>>>>>> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Thanks for quick response.
>>>>>>>
>>>>>>> Since Beam 2.21.0 is not yet available via pip
>>>>>>> <https://pypi.org/project/apache-beam/#history>, I tried to run it
>>>>>>> from HEAD. After creating fresh virtual environment, my steps were:
>>>>>>> pip install -r build-requirements.txt
>>>>>>> python setup.py build
>>>>>>> python setup.py install
>>>>>>>
>>>>>>> Then I encounter the following error when running the script from
>>>>>>> the original message:
>>>>>>>
>>>>>>> Error response from daemon: manifest for apache/beam_java_sdk:
>>>>>>> 2.21.0.dev not found: manifest unknown: manifest unknown
>>>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable
>>>>>>> to pull image apache/beam_java_sdk:2.21.0.dev
>>>>>>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally
>>>>>>> docker: Error response from daemon: manifest for
>>>>>>> apache/beam_java_sdk:2.21.0.dev not found: manifest unknown:
>>>>>>> manifest unknown.
>>>>>>> See 'docker run --help'.
>>>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>>>>> Requests sent by runner: []
>>>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>>>>> Requests multiplexing info: []
>>>>>>> Traceback (most recent call last):
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", 
>>>>>>> line
>>>>>>> 193, in _run_module_as_main
>>>>>>>     "__main__", mod_spec)
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", 
>>>>>>> line
>>>>>>> 85, in _run_code
>>>>>>>     exec(code, run_globals)
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>>>>> line 74, in <module>
>>>>>>>     run()
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>>>>> line 69, in run
>>>>>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line
>>>>>>> 528, in __exit__
>>>>>>>     self.run().wait_until_finish()
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line
>>>>>>> 514, in run
>>>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>>>>>>> line 130, in run_pipeline
>>>>>>>     return runner.run_pipeline(pipeline, options)
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>>>> line 173, in run_pipeline
>>>>>>>
>>>>>>> pipeline.to_runner_api(default_environment=self._default_environment))
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>>>> line 183, in run_via_runner_api
>>>>>>>     return self.run_stages(stage_context, stages)
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>>>> line 331, in run_stages
>>>>>>>     bundle_context_manager,
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>>>> line 470, in _run_stage
>>>>>>>     bundle_context_manager.extract_bundle_inputs_and_outputs())
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>>>> line 530, in extract_bundle_inputs_and_outputs
>>>>>>>     data_api_service_descriptor = self.data_api_service_descriptor()
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>>>> line 451, in data_api_service_descriptor
>>>>>>>     return self.worker_handlers[0].data_api_service_descriptor()
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>>>> line 445, in worker_handlers
>>>>>>>     self.stage.environment, self.num_workers))
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>>>>>> line 854, in get_worker_handlers
>>>>>>>     worker_handler.start_worker()
>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>>>>>> line 734, in start_worker
>>>>>>>     '--provision_endpoint=%s' % self.control_address,
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>>>>>> line 336, in check_output
>>>>>>>     **kwargs).stdout
>>>>>>>   File
>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>>>>>> line 418, in run
>>>>>>>     output=stdout, stderr=stderr)
>>>>>>> subprocess.CalledProcessError: Command '['docker', 'run', '-d',
>>>>>>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev',
>>>>>>> '--id=worker_1', '--logging_endpoint=host.docker.internal:50453',
>>>>>>> '--control_endpoint=host.docker.internal:50450',
>>>>>>> '--artifact_endpoint=host.docker.internal:50450',
>>>>>>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero 
>>>>>>> exit
>>>>>>> status 125.
>>>>>>>
>>>>>>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly
>>>>>>> available, I cannot pull the image. Is it possible to get access to dev
>>>>>>> images? Alternatively, are there any instructions on how to build
>>>>>>> the beam_java_sdk locally and then use the local image when running a 
>>>>>>> job?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I would like to know whether it is possible to run a streaming
>>>>>>>>> pipeline that reads from (or writes to) Kafka using DirectRunner? If 
>>>>>>>>> so,
>>>>>>>>> what should the expansion_service point to:
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90
>>>>>>>>> ?
>>>>>>>>
>>>>>>>>
>>>>>>>>> Also, when using FlinkRunner, what should be the value of
>>>>>>>>> expansion_service? Based on
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
>>>>>>>>> Flink should start expansion service on port 8097. I am running Flink 
>>>>>>>>> using
>>>>>>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution
>>>>>>>>> and the expansion service is not being started. Same holds when using
>>>>>>>>> docker image: https://hub.docker.com/_/flink. I cannot find
>>>>>>>>> expansion service being mentioned in Flink Documentation - unless it 
>>>>>>>>> is
>>>>>>>>> called by different name.
>>>>>>>>>
>>>>>>>>
>>>>>>>> After Beam 2.21.0 you should not have to specify anything. You only
>>>>>>>> need to make sure that Java is installed in the system (so that command
>>>>>>>> 'java' is available) and kafka.py will download the correct jar.
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62
>>>>>>>>
>>>>>>>> For HEAD (today), as long as you are in a Beam repo clone, you
>>>>>>>> should not have to specify anything as well since kafka.py will try to
>>>>>>>> build the dependency locally.
>>>>>>>>
>>>>>>>> For previous versions, you have to start up an expansion service
>>>>>>>> manually. I'm not familiar about the expansion service embedded in 
>>>>>>>> Flink
>>>>>>>> Job Server.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Flink Version: flink-1.9.2
>>>>>>>>> Apache-beam Version: 2.19.0
>>>>>>>>>
>>>>>>>>> Full Details:
>>>>>>>>>
>>>>>>>>> The pipeline looks as follows:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> import argparse
>>>>>>>>> import logging
>>>>>>>>>
>>>>>>>>> import apache_beam as beam
>>>>>>>>> from apache_beam.io.external import kafka
>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>>> from apache_beam.options.pipeline_options import SetupOptions
>>>>>>>>> from apache_beam.options.pipeline_options import StandardOptions
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> def format_results(elem):
>>>>>>>>>     (msg, sum_value) = elem
>>>>>>>>>     print(type(msg))
>>>>>>>>>     print(type(sum_value))
>>>>>>>>>     return f"message: {msg}, sum: {sum_value}"
>>>>>>>>>
>>>>>>>>> def run(argv=None, save_main_session=True):
>>>>>>>>>     """Main entry point; defines and runs the pipeline."""
>>>>>>>>>     parser = argparse.ArgumentParser()
>>>>>>>>>
>>>>>>>>>     parser.add_argument('--bootstrap_servers', type=str,
>>>>>>>>>                         help='Kafka Broker address')
>>>>>>>>>     parser.add_argument('--topic', type=str, help='Kafka topic to
>>>>>>>>> read from')
>>>>>>>>>     parser.add_argument('--output', type=str,
>>>>>>>>> default="/tmp/kafka-output",
>>>>>>>>>                         help='Output filepath')
>>>>>>>>>
>>>>>>>>>     args, pipeline_args = parser.parse_known_args(argv)
>>>>>>>>>
>>>>>>>>>     if args.topic is None or args.bootstrap_servers is None:
>>>>>>>>>         parser.print_usage()
>>>>>>>>>         print(sys.argv[0] + ': error: both --topic and
>>>>>>>>> --bootstrap_servers are required')
>>>>>>>>>         sys.exit(1)
>>>>>>>>>
>>>>>>>>>     options = PipelineOptions(pipeline_args)
>>>>>>>>>     # We use the save_main_session option because one or more
>>>>>>>>> DoFn's in this
>>>>>>>>>     # workflow rely on global context (e.g., a module imported at
>>>>>>>>> module level).
>>>>>>>>>     options.view_as(SetupOptions).save_main_session =
>>>>>>>>> save_main_session
>>>>>>>>>
>>>>>>>>>     # Enforce that this pipeline is always run in streaming mode
>>>>>>>>>     options.view_as(StandardOptions).streaming = True
>>>>>>>>>
>>>>>>>>>     consumer_conf = {'bootstrap.servers': args.bootstrap_servers}
>>>>>>>>>
>>>>>>>>>     with beam.Pipeline(options=options) as p:
>>>>>>>>>         events = (
>>>>>>>>>             p
>>>>>>>>>             | "ReadFromKafka" >> kafka.ReadFromKafka(
>>>>>>>>>                 consumer_config=consumer_conf,
>>>>>>>>>                 topics=[args.topic],
>>>>>>>>>             )
>>>>>>>>>             | "WindowIntoFixedWindows" >> beam.WindowInto(
>>>>>>>>>                 beam.window.FixedWindows(60))
>>>>>>>>>             | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
>>>>>>>>>             | "SumByKey" >> beam.CombinePerKey(sum)
>>>>>>>>>             | 'FormatResults' >> beam.Map(format_results)
>>>>>>>>>             | 'WriteUserScoreSums' >>
>>>>>>>>> beam.io.WriteToText(args.output)
>>>>>>>>>         )
>>>>>>>>>
>>>>>>>>> if __name__ == '__main__':
>>>>>>>>>     logging.getLogger().setLevel(logging.DEBUG)
>>>>>>>>>     run()
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> I run it using:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> python streaming.py --topic=inputs --bootstrap_servers==
>>>>>>>>> 192.168.1.219:32779
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Kafka brokers are available under the --bootstrap_servers and the
>>>>>>>>> --topic exists.
>>>>>>>>>
>>>>>>>>> The error I am getting:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>   File "streaming.py", line 74, in <module>
>>>>>>>>>     run()
>>>>>>>>>   File "streaming.py", line 69, in run
>>>>>>>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>>>>>   File
>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>>>>>> line 989, in __ror__
>>>>>>>>>     return self.transform.__ror__(pvalueish, self.label)
>>>>>>>>>   File
>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>>>>>> line 549, in __ror__
>>>>>>>>>     result = p.apply(self, pvalueish, label)
>>>>>>>>>   File
>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>>>>> line 536, in apply
>>>>>>>>>     return self.apply(transform, pvalueish)
>>>>>>>>>   File
>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>>>>> line 577, in apply
>>>>>>>>>     pvalueish_result = self.runner.apply(transform, pvalueish,
>>>>>>>>> self._options)
>>>>>>>>>   File
>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>>>>>> line 195, in apply
>>>>>>>>>     return m(transform, input, options)
>>>>>>>>>   File
>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>>>>>> line 225, in apply_PTransform
>>>>>>>>>     return transform.expand(input)
>>>>>>>>>   File
>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
>>>>>>>>> line 327, in expand
>>>>>>>>>     channel).Expand(request)
>>>>>>>>>   File
>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>>>>> line 826, in __call__
>>>>>>>>>     return _end_unary_response_blocking(state, call, False, None)
>>>>>>>>>   File
>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>>>>> line 729, in _end_unary_response_blocking
>>>>>>>>>     raise _InactiveRpcError(state)
>>>>>>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
>>>>>>>>> terminated with:
>>>>>>>>>         status = StatusCode.UNAVAILABLE
>>>>>>>>>         details = "Trying to connect an http1.x server"
>>>>>>>>>         debug_error_string =
>>>>>>>>> "{"created":"@1587496938.983698000","description":"Error received 
>>>>>>>>> from peer
>>>>>>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
>>>>>>>>> to connect an http1.x server","grpc_status":14}"
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Piotr
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>>
>> --
>> Best regards,
>> Piotr
>>
>

Reply via email to