https://github.com/apache/beam/pull/11557
On Tue, Apr 28, 2020 at 9:28 AM Robert Bradshaw <[email protected]> wrote: > Java dependencies are not yet fully propagated over the expansion service, > which might be what you're running into. I'm actually in the process of > putting together a PR to fix this; I'll let you know when it's ready. > > On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver <[email protected]> wrote: > >> I'm not sure about the org.springframework.expression.EvaluationContext >> issue, but "local class incompatible" usually happens when using Beam >> components built from different sources. Make sure to rebuild everything >> from the same commit. >> >> On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk <[email protected]> >> wrote: >> >>> After syncing to: >>> >>> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master, >>> origin/master, origin/HEAD) >>> Author: Ning Kang <[email protected]> >>> Date: Fri Apr 24 10:58:07 2020 -0700 >>> >>> The new error is: >>> >>> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize >>> Custom DoFn With Execution Info >>> at >>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) >>> at >>> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697) >>> at >>> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.<init>(FnApiDoFnRunner.java:356) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266) >>> at >>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) >>> at >>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.io.InvalidClassException: >>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class >>> incompatible: stream classdesc serialVersionUID = 7311199418509482705, >>> local class serialVersionUID = 5488866827627794770 >>> at >>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) >>> at >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) >>> at >>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) >>> at >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) >>> at >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) >>> at >>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71) >>> ... 18 more >>> >>> I am not sure it is related to >>> https://issues.apache.org/jira/browse/BEAM-9745. >>> >>> On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk <[email protected]> >>> wrote: >>> >>>> Here is an error I am getting when using DirectRunner: >>>> >>>> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for >>>> the bundle bundle_1 to finish. >>>> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951 >>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>>> Requests sent by runner: [('bundle_1', 1)] >>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>>> Requests multiplexing info: [] >>>> Traceback (most recent call last): >>>> File >>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line >>>> 193, in _run_module_as_main >>>> "__main__", mod_spec) >>>> File >>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line >>>> 85, in _run_code >>>> exec(code, run_globals) >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>>> line 74, in <module> >>>> run() >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>>> line 69, in run >>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, >>>> in __exit__ >>>> self.run().wait_until_finish() >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, >>>> in run >>>> return self.runner.run_pipeline(self, self._options) >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", >>>> line 130, in run_pipeline >>>> return runner.run_pipeline(pipeline, options) >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>> line 173, in run_pipeline >>>> >>>> pipeline.to_runner_api(default_environment=self._default_environment)) >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>> line 183, in run_via_runner_api >>>> return self.run_stages(stage_context, stages) >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>> line 331, in run_stages >>>> bundle_context_manager, >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>> line 508, in _run_stage >>>> bundle_manager) >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>> line 546, in _run_bundle >>>> data_input, data_output, input_timers, expected_timer_output) >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>> line 927, in process_bundle >>>> timer_inputs)): >>>> File >>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py", >>>> line 586, in result_iterator >>>> yield fs.pop().result() >>>> File >>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py", >>>> line 432, in result >>>> return self.__get_result() >>>> File >>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py", >>>> line 384, in __get_result >>>> raise self._exception >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/utils/thread_pool_executor.py", >>>> line 44, in run >>>> self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>> line 923, in execute >>>> dry_run) >>>> File "/Users/piotr.filipiuk/src/ >>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>> line 862, in process_bundle >>>> raise RuntimeError(result.error) >>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>> java.lang.NoClassDefFoundError: >>>> org/springframework/expression/EvaluationContext >>>> at >>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) >>>> at >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>> Source) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >>>> at >>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >>>> at >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505) >>>> at >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>> Source) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >>>> at >>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >>>> at >>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) >>>> at >>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>> Source) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>>> at >>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177) >>>> at >>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106) >>>> at >>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291) >>>> at >>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) >>>> at >>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.lang.NoClassDefFoundError: >>>> org/springframework/expression/EvaluationContext >>>> at >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478) >>>> at >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121) >>>> at >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43) >>>> at >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466) >>>> Caused by: java.lang.ClassNotFoundException: >>>> org.springframework.expression.EvaluationContext >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >>>> at >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478) >>>> at >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121) >>>> at >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43) >>>> at >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466) >>>> at >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>> Source) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >>>> at >>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >>>> at >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505) >>>> at >>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>> Source) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >>>> at >>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >>>> at >>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) >>>> at >>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>> Source) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122) >>>> at >>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>>> at >>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>>> at >>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177) >>>> at >>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106) >>>> at >>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291) >>>> at >>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) >>>> at >>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> On Wed, Apr 22, 2020 at 2:22 PM Kyle Weaver <[email protected]> >>>> wrote: >>>> >>>>> It should just work without any other changes. If it doesn't let us >>>>> know. >>>>> >>>>> On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk < >>>>> [email protected]> wrote: >>>>> >>>>>> Thx. Once the docker image is build, how do I make sure it is used >>>>>> when I run Beam pipeline, as opposed to pulling from docker hub? >>>>>> >>>>>> On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> You can build the Java SDK image from source by running the >>>>>>> following command: ./gradlew :sdks:java:container:docker >>>>>>> >>>>>>> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Thanks for quick response. >>>>>>>> >>>>>>>> Since Beam 2.21.0 is not yet available via pip >>>>>>>> <https://pypi.org/project/apache-beam/#history>, I tried to run it >>>>>>>> from HEAD. After creating fresh virtual environment, my steps were: >>>>>>>> pip install -r build-requirements.txt >>>>>>>> python setup.py build >>>>>>>> python setup.py install >>>>>>>> >>>>>>>> Then I encounter the following error when running the script from >>>>>>>> the original message: >>>>>>>> >>>>>>>> Error response from daemon: manifest for apache/beam_java_sdk: >>>>>>>> 2.21.0.dev not found: manifest unknown: manifest unknown >>>>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable >>>>>>>> to pull image apache/beam_java_sdk:2.21.0.dev >>>>>>>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally >>>>>>>> docker: Error response from daemon: manifest for >>>>>>>> apache/beam_java_sdk:2.21.0.dev not found: manifest unknown: >>>>>>>> manifest unknown. >>>>>>>> See 'docker run --help'. >>>>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>>>>>>> Requests sent by runner: [] >>>>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>>>>>>> Requests multiplexing info: [] >>>>>>>> Traceback (most recent call last): >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", >>>>>>>> line >>>>>>>> 193, in _run_module_as_main >>>>>>>> "__main__", mod_spec) >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", >>>>>>>> line >>>>>>>> 85, in _run_code >>>>>>>> exec(code, run_globals) >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>>>>>>> line 74, in <module> >>>>>>>> run() >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>>>>>>> line 69, in run >>>>>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line >>>>>>>> 528, in __exit__ >>>>>>>> self.run().wait_until_finish() >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line >>>>>>>> 514, in run >>>>>>>> return self.runner.run_pipeline(self, self._options) >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", >>>>>>>> line 130, in run_pipeline >>>>>>>> return runner.run_pipeline(pipeline, options) >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>>>> line 173, in run_pipeline >>>>>>>> >>>>>>>> pipeline.to_runner_api(default_environment=self._default_environment)) >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>>>> line 183, in run_via_runner_api >>>>>>>> return self.run_stages(stage_context, stages) >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>>>> line 331, in run_stages >>>>>>>> bundle_context_manager, >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>>>> line 470, in _run_stage >>>>>>>> bundle_context_manager.extract_bundle_inputs_and_outputs()) >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>>>>>>> line 530, in extract_bundle_inputs_and_outputs >>>>>>>> data_api_service_descriptor = self.data_api_service_descriptor() >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>>>>>>> line 451, in data_api_service_descriptor >>>>>>>> return self.worker_handlers[0].data_api_service_descriptor() >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>>>>>>> line 445, in worker_handlers >>>>>>>> self.stage.environment, self.num_workers)) >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", >>>>>>>> line 854, in get_worker_handlers >>>>>>>> worker_handler.start_worker() >>>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", >>>>>>>> line 734, in start_worker >>>>>>>> '--provision_endpoint=%s' % self.control_address, >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", >>>>>>>> line 336, in check_output >>>>>>>> **kwargs).stdout >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", >>>>>>>> line 418, in run >>>>>>>> output=stdout, stderr=stderr) >>>>>>>> subprocess.CalledProcessError: Command '['docker', 'run', '-d', >>>>>>>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', >>>>>>>> '--id=worker_1', '--logging_endpoint=host.docker.internal:50453', >>>>>>>> '--control_endpoint=host.docker.internal:50450', >>>>>>>> '--artifact_endpoint=host.docker.internal:50450', >>>>>>>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero >>>>>>>> exit >>>>>>>> status 125. >>>>>>>> >>>>>>>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly >>>>>>>> available, I cannot pull the image. Is it possible to get access to dev >>>>>>>> images? Alternatively, are there any instructions on how to build >>>>>>>> the beam_java_sdk locally and then use the local image when running a >>>>>>>> job? >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I would like to know whether it is possible to run a streaming >>>>>>>>>> pipeline that reads from (or writes to) Kafka using DirectRunner? If >>>>>>>>>> so, >>>>>>>>>> what should the expansion_service point to: >>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90 >>>>>>>>>> ? >>>>>>>>> >>>>>>>>> >>>>>>>>>> Also, when using FlinkRunner, what should be the value of >>>>>>>>>> expansion_service? Based on >>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24 >>>>>>>>>> Flink should start expansion service on port 8097. I am running >>>>>>>>>> Flink using >>>>>>>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution >>>>>>>>>> and the expansion service is not being started. Same holds when using >>>>>>>>>> docker image: https://hub.docker.com/_/flink. I cannot find >>>>>>>>>> expansion service being mentioned in Flink Documentation - unless it >>>>>>>>>> is >>>>>>>>>> called by different name. >>>>>>>>>> >>>>>>>>> >>>>>>>>> After Beam 2.21.0 you should not have to specify anything. You >>>>>>>>> only need to make sure that Java is installed in the system (so that >>>>>>>>> command 'java' is available) and kafka.py will download the correct >>>>>>>>> jar. >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62 >>>>>>>>> >>>>>>>>> For HEAD (today), as long as you are in a Beam repo clone, you >>>>>>>>> should not have to specify anything as well since kafka.py will try to >>>>>>>>> build the dependency locally. >>>>>>>>> >>>>>>>>> For previous versions, you have to start up an expansion service >>>>>>>>> manually. I'm not familiar about the expansion service embedded in >>>>>>>>> Flink >>>>>>>>> Job Server. >>>>>>>>> >>>>>>>>> >>>>>>>>>> Flink Version: flink-1.9.2 >>>>>>>>>> Apache-beam Version: 2.19.0 >>>>>>>>>> >>>>>>>>>> Full Details: >>>>>>>>>> >>>>>>>>>> The pipeline looks as follows: >>>>>>>>>> >>>>>>>>>> ``` >>>>>>>>>> import argparse >>>>>>>>>> import logging >>>>>>>>>> >>>>>>>>>> import apache_beam as beam >>>>>>>>>> from apache_beam.io.external import kafka >>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>>>>> from apache_beam.options.pipeline_options import SetupOptions >>>>>>>>>> from apache_beam.options.pipeline_options import StandardOptions >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> def format_results(elem): >>>>>>>>>> (msg, sum_value) = elem >>>>>>>>>> print(type(msg)) >>>>>>>>>> print(type(sum_value)) >>>>>>>>>> return f"message: {msg}, sum: {sum_value}" >>>>>>>>>> >>>>>>>>>> def run(argv=None, save_main_session=True): >>>>>>>>>> """Main entry point; defines and runs the pipeline.""" >>>>>>>>>> parser = argparse.ArgumentParser() >>>>>>>>>> >>>>>>>>>> parser.add_argument('--bootstrap_servers', type=str, >>>>>>>>>> help='Kafka Broker address') >>>>>>>>>> parser.add_argument('--topic', type=str, help='Kafka topic to >>>>>>>>>> read from') >>>>>>>>>> parser.add_argument('--output', type=str, >>>>>>>>>> default="/tmp/kafka-output", >>>>>>>>>> help='Output filepath') >>>>>>>>>> >>>>>>>>>> args, pipeline_args = parser.parse_known_args(argv) >>>>>>>>>> >>>>>>>>>> if args.topic is None or args.bootstrap_servers is None: >>>>>>>>>> parser.print_usage() >>>>>>>>>> print(sys.argv[0] + ': error: both --topic and >>>>>>>>>> --bootstrap_servers are required') >>>>>>>>>> sys.exit(1) >>>>>>>>>> >>>>>>>>>> options = PipelineOptions(pipeline_args) >>>>>>>>>> # We use the save_main_session option because one or more >>>>>>>>>> DoFn's in this >>>>>>>>>> # workflow rely on global context (e.g., a module imported at >>>>>>>>>> module level). >>>>>>>>>> options.view_as(SetupOptions).save_main_session = >>>>>>>>>> save_main_session >>>>>>>>>> >>>>>>>>>> # Enforce that this pipeline is always run in streaming mode >>>>>>>>>> options.view_as(StandardOptions).streaming = True >>>>>>>>>> >>>>>>>>>> consumer_conf = {'bootstrap.servers': args.bootstrap_servers} >>>>>>>>>> >>>>>>>>>> with beam.Pipeline(options=options) as p: >>>>>>>>>> events = ( >>>>>>>>>> p >>>>>>>>>> | "ReadFromKafka" >> kafka.ReadFromKafka( >>>>>>>>>> consumer_config=consumer_conf, >>>>>>>>>> topics=[args.topic], >>>>>>>>>> ) >>>>>>>>>> | "WindowIntoFixedWindows" >> beam.WindowInto( >>>>>>>>>> beam.window.FixedWindows(60)) >>>>>>>>>> | "AddOnes" >> beam.Map(lambda msg: (msg, 1)) >>>>>>>>>> | "SumByKey" >> beam.CombinePerKey(sum) >>>>>>>>>> | 'FormatResults' >> beam.Map(format_results) >>>>>>>>>> | 'WriteUserScoreSums' >> >>>>>>>>>> beam.io.WriteToText(args.output) >>>>>>>>>> ) >>>>>>>>>> >>>>>>>>>> if __name__ == '__main__': >>>>>>>>>> logging.getLogger().setLevel(logging.DEBUG) >>>>>>>>>> run() >>>>>>>>>> ``` >>>>>>>>>> >>>>>>>>>> I run it using: >>>>>>>>>> >>>>>>>>>> ``` >>>>>>>>>> python streaming.py --topic=inputs --bootstrap_servers== >>>>>>>>>> 192.168.1.219:32779 >>>>>>>>>> ``` >>>>>>>>>> >>>>>>>>>> Kafka brokers are available under the --bootstrap_servers and the >>>>>>>>>> --topic exists. >>>>>>>>>> >>>>>>>>>> The error I am getting: >>>>>>>>>> >>>>>>>>>> ``` >>>>>>>>>> Traceback (most recent call last): >>>>>>>>>> File "streaming.py", line 74, in <module> >>>>>>>>>> run() >>>>>>>>>> File "streaming.py", line 69, in run >>>>>>>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>>>>>>>> File >>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>>>>>>>> line 989, in __ror__ >>>>>>>>>> return self.transform.__ror__(pvalueish, self.label) >>>>>>>>>> File >>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>>>>>>>> line 549, in __ror__ >>>>>>>>>> result = p.apply(self, pvalueish, label) >>>>>>>>>> File >>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>>>>>>> line 536, in apply >>>>>>>>>> return self.apply(transform, pvalueish) >>>>>>>>>> File >>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>>>>>>> line 577, in apply >>>>>>>>>> pvalueish_result = self.runner.apply(transform, pvalueish, >>>>>>>>>> self._options) >>>>>>>>>> File >>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>>>>>>>>> line 195, in apply >>>>>>>>>> return m(transform, input, options) >>>>>>>>>> File >>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>>>>>>>>> line 225, in apply_PTransform >>>>>>>>>> return transform.expand(input) >>>>>>>>>> File >>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py", >>>>>>>>>> line 327, in expand >>>>>>>>>> channel).Expand(request) >>>>>>>>>> File >>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>>>>>>>> line 826, in __call__ >>>>>>>>>> return _end_unary_response_blocking(state, call, False, None) >>>>>>>>>> File >>>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>>>>>>>> line 729, in _end_unary_response_blocking >>>>>>>>>> raise _InactiveRpcError(state) >>>>>>>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that >>>>>>>>>> terminated with: >>>>>>>>>> status = StatusCode.UNAVAILABLE >>>>>>>>>> details = "Trying to connect an http1.x server" >>>>>>>>>> debug_error_string = >>>>>>>>>> "{"created":"@1587496938.983698000","description":"Error received >>>>>>>>>> from peer >>>>>>>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying >>>>>>>>>> to connect an http1.x server","grpc_status":14}" >>>>>>>>>> ``` >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards, >>>>>>>> Piotr >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best regards, >>>>>> Piotr >>>>>> >>>>> >>>> >>>> -- >>>> Best regards, >>>> Piotr >>>> >>> >>> >>> -- >>> Best regards, >>> Piotr >>> >>
