https://github.com/apache/beam/pull/11557

On Tue, Apr 28, 2020 at 9:28 AM Robert Bradshaw <[email protected]> wrote:

> Java dependencies are not yet fully propagated over the expansion service,
> which might be what you're running into. I'm actually in the process of
> putting together a PR to fix this; I'll let you know when it's ready.
>
> On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver <[email protected]> wrote:
>
>> I'm not sure about the org.springframework.expression.EvaluationContext
>> issue, but "local class incompatible" usually happens when using Beam
>> components built from different sources. Make sure to rebuild everything
>> from the same commit.
>>
>> On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> After syncing to:
>>>
>>> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master,
>>> origin/master, origin/HEAD)
>>> Author: Ning Kang <[email protected]>
>>> Date:   Fri Apr 24 10:58:07 2020 -0700
>>>
>>> The new error is:
>>>
>>> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize
>>> Custom DoFn With Execution Info
>>>         at
>>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>>>         at
>>> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697)
>>>         at
>>> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.<init>(FnApiDoFnRunner.java:356)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165)
>>>         at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141)
>>>         at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233)
>>>         at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>>         at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>>         at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>>         at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474)
>>>         at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271)
>>>         at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534)
>>>         at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266)
>>>         at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>>>         at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.InvalidClassException:
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class
>>> incompatible: stream classdesc serialVersionUID = 7311199418509482705,
>>> local class serialVersionUID = 5488866827627794770
>>>         at
>>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>>>         at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>>>         at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>>>         at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>>>         at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>>         at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
>>>         at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
>>>         at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
>>>         at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>>         at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>>>         at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>>>         at
>>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
>>>         ... 18 more
>>>
>>> I am not sure it is related to
>>> https://issues.apache.org/jira/browse/BEAM-9745.
>>>
>>> On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk <[email protected]>
>>> wrote:
>>>
>>>> Here is an error I am getting when using DirectRunner:
>>>>
>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for
>>>> the bundle bundle_1 to finish.
>>>> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951
>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>> Requests sent by runner: [('bundle_1', 1)]
>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>> Requests multiplexing info: []
>>>> Traceback (most recent call last):
>>>>   File
>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>>> 193, in _run_module_as_main
>>>>     "__main__", mod_spec)
>>>>   File
>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>>> 85, in _run_code
>>>>     exec(code, run_globals)
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>> line 74, in <module>
>>>>     run()
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>> line 69, in run
>>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528,
>>>> in __exit__
>>>>     self.run().wait_until_finish()
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514,
>>>> in run
>>>>     return self.runner.run_pipeline(self, self._options)
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>>>> line 130, in run_pipeline
>>>>     return runner.run_pipeline(pipeline, options)
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 173, in run_pipeline
>>>>
>>>> pipeline.to_runner_api(default_environment=self._default_environment))
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 183, in run_via_runner_api
>>>>     return self.run_stages(stage_context, stages)
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 331, in run_stages
>>>>     bundle_context_manager,
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 508, in _run_stage
>>>>     bundle_manager)
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 546, in _run_bundle
>>>>     data_input, data_output, input_timers, expected_timer_output)
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 927, in process_bundle
>>>>     timer_inputs)):
>>>>   File
>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
>>>> line 586, in result_iterator
>>>>     yield fs.pop().result()
>>>>   File
>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
>>>> line 432, in result
>>>>     return self.__get_result()
>>>>   File
>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
>>>> line 384, in __get_result
>>>>     raise self._exception
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/utils/thread_pool_executor.py",
>>>> line 44, in run
>>>>     self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 923, in execute
>>>>     dry_run)
>>>>   File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 862, in process_bundle
>>>>     raise RuntimeError(result.error)
>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>> java.lang.NoClassDefFoundError:
>>>> org/springframework/expression/EvaluationContext
>>>>         at
>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>         at
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>> Source)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>>>>         at
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>>>>         at
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505)
>>>>         at
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>> Source)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>>>>         at
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>>>>         at
>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>>>         at
>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>>         at
>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)
>>>>         at
>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
>>>>         at
>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291)
>>>>         at
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>>>>         at
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.NoClassDefFoundError:
>>>> org/springframework/expression/EvaluationContext
>>>>         at
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)
>>>>         at
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)
>>>>         at
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)
>>>>         at
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466)
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> org.springframework.expression.EvaluationContext
>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>         at
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)
>>>>         at
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)
>>>>         at
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)
>>>>         at
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466)
>>>>         at
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>>> Source)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>>>>         at
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>>>>         at
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505)
>>>>         at
>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>>> Source)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
>>>>         at
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
>>>>         at
>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>>>         at
>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122)
>>>>         at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
>>>>         at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
>>>>         at
>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)
>>>>         at
>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
>>>>         at
>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291)
>>>>         at
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>>>>         at
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> On Wed, Apr 22, 2020 at 2:22 PM Kyle Weaver <[email protected]>
>>>> wrote:
>>>>
>>>>> It should just work without any other changes. If it doesn't let us
>>>>> know.
>>>>>
>>>>> On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Thx. Once the docker image is build, how do I make sure it is used
>>>>>> when I run Beam pipeline, as opposed to pulling from docker hub?
>>>>>>
>>>>>> On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> You can build the Java SDK image from source by running the
>>>>>>> following command: ./gradlew :sdks:java:container:docker
>>>>>>>
>>>>>>> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Thanks for quick response.
>>>>>>>>
>>>>>>>> Since Beam 2.21.0 is not yet available via pip
>>>>>>>> <https://pypi.org/project/apache-beam/#history>, I tried to run it
>>>>>>>> from HEAD. After creating fresh virtual environment, my steps were:
>>>>>>>> pip install -r build-requirements.txt
>>>>>>>> python setup.py build
>>>>>>>> python setup.py install
>>>>>>>>
>>>>>>>> Then I encounter the following error when running the script from
>>>>>>>> the original message:
>>>>>>>>
>>>>>>>> Error response from daemon: manifest for apache/beam_java_sdk:
>>>>>>>> 2.21.0.dev not found: manifest unknown: manifest unknown
>>>>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable
>>>>>>>> to pull image apache/beam_java_sdk:2.21.0.dev
>>>>>>>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally
>>>>>>>> docker: Error response from daemon: manifest for
>>>>>>>> apache/beam_java_sdk:2.21.0.dev not found: manifest unknown:
>>>>>>>> manifest unknown.
>>>>>>>> See 'docker run --help'.
>>>>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>>>>>> Requests sent by runner: []
>>>>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>>>>>> Requests multiplexing info: []
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File
>>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", 
>>>>>>>> line
>>>>>>>> 193, in _run_module_as_main
>>>>>>>>     "__main__", mod_spec)
>>>>>>>>   File
>>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", 
>>>>>>>> line
>>>>>>>> 85, in _run_code
>>>>>>>>     exec(code, run_globals)
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>>>>>> line 74, in <module>
>>>>>>>>     run()
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>>>>>> line 69, in run
>>>>>>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line
>>>>>>>> 528, in __exit__
>>>>>>>>     self.run().wait_until_finish()
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line
>>>>>>>> 514, in run
>>>>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>>>>>>>> line 130, in run_pipeline
>>>>>>>>     return runner.run_pipeline(pipeline, options)
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>>>>> line 173, in run_pipeline
>>>>>>>>
>>>>>>>> pipeline.to_runner_api(default_environment=self._default_environment))
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>>>>> line 183, in run_via_runner_api
>>>>>>>>     return self.run_stages(stage_context, stages)
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>>>>> line 331, in run_stages
>>>>>>>>     bundle_context_manager,
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>>>>>> line 470, in _run_stage
>>>>>>>>     bundle_context_manager.extract_bundle_inputs_and_outputs())
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>>>>> line 530, in extract_bundle_inputs_and_outputs
>>>>>>>>     data_api_service_descriptor = self.data_api_service_descriptor()
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>>>>> line 451, in data_api_service_descriptor
>>>>>>>>     return self.worker_handlers[0].data_api_service_descriptor()
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>>>>>> line 445, in worker_handlers
>>>>>>>>     self.stage.environment, self.num_workers))
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>>>>>>> line 854, in get_worker_handlers
>>>>>>>>     worker_handler.start_worker()
>>>>>>>>   File "/Users/piotr.filipiuk/src/
>>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>>>>>>> line 734, in start_worker
>>>>>>>>     '--provision_endpoint=%s' % self.control_address,
>>>>>>>>   File
>>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>>>>>>> line 336, in check_output
>>>>>>>>     **kwargs).stdout
>>>>>>>>   File
>>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>>>>>>> line 418, in run
>>>>>>>>     output=stdout, stderr=stderr)
>>>>>>>> subprocess.CalledProcessError: Command '['docker', 'run', '-d',
>>>>>>>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev',
>>>>>>>> '--id=worker_1', '--logging_endpoint=host.docker.internal:50453',
>>>>>>>> '--control_endpoint=host.docker.internal:50450',
>>>>>>>> '--artifact_endpoint=host.docker.internal:50450',
>>>>>>>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero 
>>>>>>>> exit
>>>>>>>> status 125.
>>>>>>>>
>>>>>>>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly
>>>>>>>> available, I cannot pull the image. Is it possible to get access to dev
>>>>>>>> images? Alternatively, are there any instructions on how to build
>>>>>>>> the beam_java_sdk locally and then use the local image when running a 
>>>>>>>> job?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I would like to know whether it is possible to run a streaming
>>>>>>>>>> pipeline that reads from (or writes to) Kafka using DirectRunner? If 
>>>>>>>>>> so,
>>>>>>>>>> what should the expansion_service point to:
>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90
>>>>>>>>>> ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Also, when using FlinkRunner, what should be the value of
>>>>>>>>>> expansion_service? Based on
>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
>>>>>>>>>> Flink should start expansion service on port 8097. I am running 
>>>>>>>>>> Flink using
>>>>>>>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution
>>>>>>>>>> and the expansion service is not being started. Same holds when using
>>>>>>>>>> docker image: https://hub.docker.com/_/flink. I cannot find
>>>>>>>>>> expansion service being mentioned in Flink Documentation - unless it 
>>>>>>>>>> is
>>>>>>>>>> called by different name.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> After Beam 2.21.0 you should not have to specify anything. You
>>>>>>>>> only need to make sure that Java is installed in the system (so that
>>>>>>>>> command 'java' is available) and kafka.py will download the correct 
>>>>>>>>> jar.
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62
>>>>>>>>>
>>>>>>>>> For HEAD (today), as long as you are in a Beam repo clone, you
>>>>>>>>> should not have to specify anything as well since kafka.py will try to
>>>>>>>>> build the dependency locally.
>>>>>>>>>
>>>>>>>>> For previous versions, you have to start up an expansion service
>>>>>>>>> manually. I'm not familiar about the expansion service embedded in 
>>>>>>>>> Flink
>>>>>>>>> Job Server.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Flink Version: flink-1.9.2
>>>>>>>>>> Apache-beam Version: 2.19.0
>>>>>>>>>>
>>>>>>>>>> Full Details:
>>>>>>>>>>
>>>>>>>>>> The pipeline looks as follows:
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> import argparse
>>>>>>>>>> import logging
>>>>>>>>>>
>>>>>>>>>> import apache_beam as beam
>>>>>>>>>> from apache_beam.io.external import kafka
>>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>>>> from apache_beam.options.pipeline_options import SetupOptions
>>>>>>>>>> from apache_beam.options.pipeline_options import StandardOptions
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> def format_results(elem):
>>>>>>>>>>     (msg, sum_value) = elem
>>>>>>>>>>     print(type(msg))
>>>>>>>>>>     print(type(sum_value))
>>>>>>>>>>     return f"message: {msg}, sum: {sum_value}"
>>>>>>>>>>
>>>>>>>>>> def run(argv=None, save_main_session=True):
>>>>>>>>>>     """Main entry point; defines and runs the pipeline."""
>>>>>>>>>>     parser = argparse.ArgumentParser()
>>>>>>>>>>
>>>>>>>>>>     parser.add_argument('--bootstrap_servers', type=str,
>>>>>>>>>>                         help='Kafka Broker address')
>>>>>>>>>>     parser.add_argument('--topic', type=str, help='Kafka topic to
>>>>>>>>>> read from')
>>>>>>>>>>     parser.add_argument('--output', type=str,
>>>>>>>>>> default="/tmp/kafka-output",
>>>>>>>>>>                         help='Output filepath')
>>>>>>>>>>
>>>>>>>>>>     args, pipeline_args = parser.parse_known_args(argv)
>>>>>>>>>>
>>>>>>>>>>     if args.topic is None or args.bootstrap_servers is None:
>>>>>>>>>>         parser.print_usage()
>>>>>>>>>>         print(sys.argv[0] + ': error: both --topic and
>>>>>>>>>> --bootstrap_servers are required')
>>>>>>>>>>         sys.exit(1)
>>>>>>>>>>
>>>>>>>>>>     options = PipelineOptions(pipeline_args)
>>>>>>>>>>     # We use the save_main_session option because one or more
>>>>>>>>>> DoFn's in this
>>>>>>>>>>     # workflow rely on global context (e.g., a module imported at
>>>>>>>>>> module level).
>>>>>>>>>>     options.view_as(SetupOptions).save_main_session =
>>>>>>>>>> save_main_session
>>>>>>>>>>
>>>>>>>>>>     # Enforce that this pipeline is always run in streaming mode
>>>>>>>>>>     options.view_as(StandardOptions).streaming = True
>>>>>>>>>>
>>>>>>>>>>     consumer_conf = {'bootstrap.servers': args.bootstrap_servers}
>>>>>>>>>>
>>>>>>>>>>     with beam.Pipeline(options=options) as p:
>>>>>>>>>>         events = (
>>>>>>>>>>             p
>>>>>>>>>>             | "ReadFromKafka" >> kafka.ReadFromKafka(
>>>>>>>>>>                 consumer_config=consumer_conf,
>>>>>>>>>>                 topics=[args.topic],
>>>>>>>>>>             )
>>>>>>>>>>             | "WindowIntoFixedWindows" >> beam.WindowInto(
>>>>>>>>>>                 beam.window.FixedWindows(60))
>>>>>>>>>>             | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
>>>>>>>>>>             | "SumByKey" >> beam.CombinePerKey(sum)
>>>>>>>>>>             | 'FormatResults' >> beam.Map(format_results)
>>>>>>>>>>             | 'WriteUserScoreSums' >>
>>>>>>>>>> beam.io.WriteToText(args.output)
>>>>>>>>>>         )
>>>>>>>>>>
>>>>>>>>>> if __name__ == '__main__':
>>>>>>>>>>     logging.getLogger().setLevel(logging.DEBUG)
>>>>>>>>>>     run()
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> I run it using:
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> python streaming.py --topic=inputs --bootstrap_servers==
>>>>>>>>>> 192.168.1.219:32779
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> Kafka brokers are available under the --bootstrap_servers and the
>>>>>>>>>> --topic exists.
>>>>>>>>>>
>>>>>>>>>> The error I am getting:
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>>   File "streaming.py", line 74, in <module>
>>>>>>>>>>     run()
>>>>>>>>>>   File "streaming.py", line 69, in run
>>>>>>>>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>>>>>>   File
>>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>>>>>>> line 989, in __ror__
>>>>>>>>>>     return self.transform.__ror__(pvalueish, self.label)
>>>>>>>>>>   File
>>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>>>>>>> line 549, in __ror__
>>>>>>>>>>     result = p.apply(self, pvalueish, label)
>>>>>>>>>>   File
>>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>>>>>> line 536, in apply
>>>>>>>>>>     return self.apply(transform, pvalueish)
>>>>>>>>>>   File
>>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>>>>>> line 577, in apply
>>>>>>>>>>     pvalueish_result = self.runner.apply(transform, pvalueish,
>>>>>>>>>> self._options)
>>>>>>>>>>   File
>>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>>>>>>> line 195, in apply
>>>>>>>>>>     return m(transform, input, options)
>>>>>>>>>>   File
>>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>>>>>>> line 225, in apply_PTransform
>>>>>>>>>>     return transform.expand(input)
>>>>>>>>>>   File
>>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
>>>>>>>>>> line 327, in expand
>>>>>>>>>>     channel).Expand(request)
>>>>>>>>>>   File
>>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>>>>>> line 826, in __call__
>>>>>>>>>>     return _end_unary_response_blocking(state, call, False, None)
>>>>>>>>>>   File
>>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>>>>>> line 729, in _end_unary_response_blocking
>>>>>>>>>>     raise _InactiveRpcError(state)
>>>>>>>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
>>>>>>>>>> terminated with:
>>>>>>>>>>         status = StatusCode.UNAVAILABLE
>>>>>>>>>>         details = "Trying to connect an http1.x server"
>>>>>>>>>>         debug_error_string =
>>>>>>>>>> "{"created":"@1587496938.983698000","description":"Error received 
>>>>>>>>>> from peer
>>>>>>>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
>>>>>>>>>> to connect an http1.x server","grpc_status":14}"
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Piotr
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Piotr
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>

Reply via email to