Java dependencies are not yet fully propagated over the expansion service, which might be what you're running into. I'm actually in the process of putting together a PR to fix this; I'll let you know when it's ready.
On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver <[email protected]> wrote: > I'm not sure about the org.springframework.expression.EvaluationContext > issue, but "local class incompatible" usually happens when using Beam > components built from different sources. Make sure to rebuild everything > from the same commit. > > On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk <[email protected]> > wrote: > >> After syncing to: >> >> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master, >> origin/master, origin/HEAD) >> Author: Ning Kang <[email protected]> >> Date: Fri Apr 24 10:58:07 2020 -0700 >> >> The new error is: >> >> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize >> Custom DoFn With Execution Info >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) >> at >> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697) >> at >> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.<init>(FnApiDoFnRunner.java:356) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.InvalidClassException: >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class >> incompatible: stream classdesc serialVersionUID = 7311199418509482705, >> local class serialVersionUID = 5488866827627794770 >> at >> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) >> at >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) >> at >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) >> at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) >> at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71) >> ... 18 more >> >> I am not sure it is related to >> https://issues.apache.org/jira/browse/BEAM-9745. >> >> On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk <[email protected]> >> wrote: >> >>> Here is an error I am getting when using DirectRunner: >>> >>> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for >>> the bundle bundle_1 to finish. >>> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951 >>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>> Requests sent by runner: [('bundle_1', 1)] >>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>> Requests multiplexing info: [] >>> Traceback (most recent call last): >>> File >>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line >>> 193, in _run_module_as_main >>> "__main__", mod_spec) >>> File >>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line >>> 85, in _run_code >>> exec(code, run_globals) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>> line 74, in <module> >>> run() >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>> line 69, in run >>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, >>> in __exit__ >>> self.run().wait_until_finish() >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, >>> in run >>> return self.runner.run_pipeline(self, self._options) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", >>> line 130, in run_pipeline >>> return runner.run_pipeline(pipeline, options) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 173, in run_pipeline >>> >>> pipeline.to_runner_api(default_environment=self._default_environment)) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 183, in run_via_runner_api >>> return self.run_stages(stage_context, stages) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 331, in run_stages >>> bundle_context_manager, >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 508, in _run_stage >>> bundle_manager) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 546, in _run_bundle >>> data_input, data_output, input_timers, expected_timer_output) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 927, in process_bundle >>> timer_inputs)): >>> File >>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py", >>> line 586, in result_iterator >>> yield fs.pop().result() >>> File >>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py", >>> line 432, in result >>> return self.__get_result() >>> File >>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py", >>> line 384, in __get_result >>> raise self._exception >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/utils/thread_pool_executor.py", >>> line 44, in run >>> self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 923, in execute >>> dry_run) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 862, in process_bundle >>> raise RuntimeError(result.error) >>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>> java.lang.NoClassDefFoundError: >>> org/springframework/expression/EvaluationContext >>> at >>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) >>> at >>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>> Source) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >>> at >>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >>> at >>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505) >>> at >>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>> Source) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >>> at >>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >>> at >>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) >>> at >>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>> Source) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>> at >>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177) >>> at >>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291) >>> at >>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) >>> at >>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.lang.NoClassDefFoundError: >>> org/springframework/expression/EvaluationContext >>> at >>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478) >>> at >>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121) >>> at >>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43) >>> at >>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466) >>> Caused by: java.lang.ClassNotFoundException: >>> org.springframework.expression.EvaluationContext >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >>> at >>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478) >>> at >>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121) >>> at >>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43) >>> at >>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466) >>> at >>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>> Source) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >>> at >>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >>> at >>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505) >>> at >>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>> Source) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >>> at >>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >>> at >>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) >>> at >>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>> Source) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122) >>> at >>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >>> at >>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >>> at >>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177) >>> at >>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106) >>> at >>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291) >>> at >>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) >>> at >>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> On Wed, Apr 22, 2020 at 2:22 PM Kyle Weaver <[email protected]> wrote: >>> >>>> It should just work without any other changes. If it doesn't let us >>>> know. >>>> >>>> On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk < >>>> [email protected]> wrote: >>>> >>>>> Thx. Once the docker image is build, how do I make sure it is used >>>>> when I run Beam pipeline, as opposed to pulling from docker hub? >>>>> >>>>> On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]> >>>>> wrote: >>>>> >>>>>> You can build the Java SDK image from source by running the following >>>>>> command: ./gradlew :sdks:java:container:docker >>>>>> >>>>>> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Thanks for quick response. >>>>>>> >>>>>>> Since Beam 2.21.0 is not yet available via pip >>>>>>> <https://pypi.org/project/apache-beam/#history>, I tried to run it >>>>>>> from HEAD. After creating fresh virtual environment, my steps were: >>>>>>> pip install -r build-requirements.txt >>>>>>> python setup.py build >>>>>>> python setup.py install >>>>>>> >>>>>>> Then I encounter the following error when running the script from >>>>>>> the original message: >>>>>>> >>>>>>> Error response from daemon: manifest for apache/beam_java_sdk: >>>>>>> 2.21.0.dev not found: manifest unknown: manifest unknown >>>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable >>>>>>> to pull image apache/beam_java_sdk:2.21.0.dev >>>>>>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally >>>>>>> docker: Error response from daemon: manifest for >>>>>>> apache/beam_java_sdk:2.21.0.dev not found: manifest unknown: >>>>>>> manifest unknown. >>>>>>> See 'docker run --help'. >>>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>>>>>> Requests sent by runner: [] >>>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>>>>>> Requests multiplexing info: [] >>>>>>> Traceback (most recent call last): >>>>>>> File >>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", >>>>>>> line >>>>>>> 193, in _run_module_as_main >>>>>>> "__main__", mod_spec) >>>>>>> File >>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", >>>>>>> line >>>>>>> 85, in _run_code >>>>>>> exec(code, run_globals) >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>>>>>> line 74, in <module> >>>>>>> run() >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>>>>>> line 69, in run >>>>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line >>>>>>> 528, in __exit__ >>>>>>> self.run().wait_until_finish() >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line >>>>>>> 514, in run >>>>>>> return self.runner.run_pipeline(self, self._options) >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", >>>>>>> line 130, in run_pipeline >>>>>>> return runner.run_pipeline(pipeline, options) >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>>> line 173, in run_pipeline >>>>>>> >>>>>>> pipeline.to_runner_api(default_environment=self._default_environment)) >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>>> line 183, in run_via_runner_api >>>>>>> return self.run_stages(stage_context, stages) >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>>> line 331, in run_stages >>>>>>> bundle_context_manager, >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>>> line 470, in _run_stage >>>>>>> bundle_context_manager.extract_bundle_inputs_and_outputs()) >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>>>>>> line 530, in extract_bundle_inputs_and_outputs >>>>>>> data_api_service_descriptor = self.data_api_service_descriptor() >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>>>>>> line 451, in data_api_service_descriptor >>>>>>> return self.worker_handlers[0].data_api_service_descriptor() >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>>>>>> line 445, in worker_handlers >>>>>>> self.stage.environment, self.num_workers)) >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", >>>>>>> line 854, in get_worker_handlers >>>>>>> worker_handler.start_worker() >>>>>>> File "/Users/piotr.filipiuk/src/ >>>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", >>>>>>> line 734, in start_worker >>>>>>> '--provision_endpoint=%s' % self.control_address, >>>>>>> File >>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", >>>>>>> line 336, in check_output >>>>>>> **kwargs).stdout >>>>>>> File >>>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", >>>>>>> line 418, in run >>>>>>> output=stdout, stderr=stderr) >>>>>>> subprocess.CalledProcessError: Command '['docker', 'run', '-d', >>>>>>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', >>>>>>> '--id=worker_1', '--logging_endpoint=host.docker.internal:50453', >>>>>>> '--control_endpoint=host.docker.internal:50450', >>>>>>> '--artifact_endpoint=host.docker.internal:50450', >>>>>>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero >>>>>>> exit >>>>>>> status 125. >>>>>>> >>>>>>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly >>>>>>> available, I cannot pull the image. Is it possible to get access to dev >>>>>>> images? Alternatively, are there any instructions on how to build >>>>>>> the beam_java_sdk locally and then use the local image when running a >>>>>>> job? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I would like to know whether it is possible to run a streaming >>>>>>>>> pipeline that reads from (or writes to) Kafka using DirectRunner? If >>>>>>>>> so, >>>>>>>>> what should the expansion_service point to: >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90 >>>>>>>>> ? >>>>>>>> >>>>>>>> >>>>>>>>> Also, when using FlinkRunner, what should be the value of >>>>>>>>> expansion_service? Based on >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24 >>>>>>>>> Flink should start expansion service on port 8097. I am running Flink >>>>>>>>> using >>>>>>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution >>>>>>>>> and the expansion service is not being started. Same holds when using >>>>>>>>> docker image: https://hub.docker.com/_/flink. I cannot find >>>>>>>>> expansion service being mentioned in Flink Documentation - unless it >>>>>>>>> is >>>>>>>>> called by different name. >>>>>>>>> >>>>>>>> >>>>>>>> After Beam 2.21.0 you should not have to specify anything. You only >>>>>>>> need to make sure that Java is installed in the system (so that command >>>>>>>> 'java' is available) and kafka.py will download the correct jar. >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62 >>>>>>>> >>>>>>>> For HEAD (today), as long as you are in a Beam repo clone, you >>>>>>>> should not have to specify anything as well since kafka.py will try to >>>>>>>> build the dependency locally. >>>>>>>> >>>>>>>> For previous versions, you have to start up an expansion service >>>>>>>> manually. I'm not familiar about the expansion service embedded in >>>>>>>> Flink >>>>>>>> Job Server. >>>>>>>> >>>>>>>> >>>>>>>>> Flink Version: flink-1.9.2 >>>>>>>>> Apache-beam Version: 2.19.0 >>>>>>>>> >>>>>>>>> Full Details: >>>>>>>>> >>>>>>>>> The pipeline looks as follows: >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> import argparse >>>>>>>>> import logging >>>>>>>>> >>>>>>>>> import apache_beam as beam >>>>>>>>> from apache_beam.io.external import kafka >>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>>>> from apache_beam.options.pipeline_options import SetupOptions >>>>>>>>> from apache_beam.options.pipeline_options import StandardOptions >>>>>>>>> >>>>>>>>> >>>>>>>>> def format_results(elem): >>>>>>>>> (msg, sum_value) = elem >>>>>>>>> print(type(msg)) >>>>>>>>> print(type(sum_value)) >>>>>>>>> return f"message: {msg}, sum: {sum_value}" >>>>>>>>> >>>>>>>>> def run(argv=None, save_main_session=True): >>>>>>>>> """Main entry point; defines and runs the pipeline.""" >>>>>>>>> parser = argparse.ArgumentParser() >>>>>>>>> >>>>>>>>> parser.add_argument('--bootstrap_servers', type=str, >>>>>>>>> help='Kafka Broker address') >>>>>>>>> parser.add_argument('--topic', type=str, help='Kafka topic to >>>>>>>>> read from') >>>>>>>>> parser.add_argument('--output', type=str, >>>>>>>>> default="/tmp/kafka-output", >>>>>>>>> help='Output filepath') >>>>>>>>> >>>>>>>>> args, pipeline_args = parser.parse_known_args(argv) >>>>>>>>> >>>>>>>>> if args.topic is None or args.bootstrap_servers is None: >>>>>>>>> parser.print_usage() >>>>>>>>> print(sys.argv[0] + ': error: both --topic and >>>>>>>>> --bootstrap_servers are required') >>>>>>>>> sys.exit(1) >>>>>>>>> >>>>>>>>> options = PipelineOptions(pipeline_args) >>>>>>>>> # We use the save_main_session option because one or more >>>>>>>>> DoFn's in this >>>>>>>>> # workflow rely on global context (e.g., a module imported at >>>>>>>>> module level). >>>>>>>>> options.view_as(SetupOptions).save_main_session = >>>>>>>>> save_main_session >>>>>>>>> >>>>>>>>> # Enforce that this pipeline is always run in streaming mode >>>>>>>>> options.view_as(StandardOptions).streaming = True >>>>>>>>> >>>>>>>>> consumer_conf = {'bootstrap.servers': args.bootstrap_servers} >>>>>>>>> >>>>>>>>> with beam.Pipeline(options=options) as p: >>>>>>>>> events = ( >>>>>>>>> p >>>>>>>>> | "ReadFromKafka" >> kafka.ReadFromKafka( >>>>>>>>> consumer_config=consumer_conf, >>>>>>>>> topics=[args.topic], >>>>>>>>> ) >>>>>>>>> | "WindowIntoFixedWindows" >> beam.WindowInto( >>>>>>>>> beam.window.FixedWindows(60)) >>>>>>>>> | "AddOnes" >> beam.Map(lambda msg: (msg, 1)) >>>>>>>>> | "SumByKey" >> beam.CombinePerKey(sum) >>>>>>>>> | 'FormatResults' >> beam.Map(format_results) >>>>>>>>> | 'WriteUserScoreSums' >> >>>>>>>>> beam.io.WriteToText(args.output) >>>>>>>>> ) >>>>>>>>> >>>>>>>>> if __name__ == '__main__': >>>>>>>>> logging.getLogger().setLevel(logging.DEBUG) >>>>>>>>> run() >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> I run it using: >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> python streaming.py --topic=inputs --bootstrap_servers== >>>>>>>>> 192.168.1.219:32779 >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> Kafka brokers are available under the --bootstrap_servers and the >>>>>>>>> --topic exists. >>>>>>>>> >>>>>>>>> The error I am getting: >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> Traceback (most recent call last): >>>>>>>>> File "streaming.py", line 74, in <module> >>>>>>>>> run() >>>>>>>>> File "streaming.py", line 69, in run >>>>>>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>>>>>>> File >>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>>>>>>> line 989, in __ror__ >>>>>>>>> return self.transform.__ror__(pvalueish, self.label) >>>>>>>>> File >>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>>>>>>> line 549, in __ror__ >>>>>>>>> result = p.apply(self, pvalueish, label) >>>>>>>>> File >>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>>>>>> line 536, in apply >>>>>>>>> return self.apply(transform, pvalueish) >>>>>>>>> File >>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>>>>>> line 577, in apply >>>>>>>>> pvalueish_result = self.runner.apply(transform, pvalueish, >>>>>>>>> self._options) >>>>>>>>> File >>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>>>>>>>> line 195, in apply >>>>>>>>> return m(transform, input, options) >>>>>>>>> File >>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>>>>>>>> line 225, in apply_PTransform >>>>>>>>> return transform.expand(input) >>>>>>>>> File >>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py", >>>>>>>>> line 327, in expand >>>>>>>>> channel).Expand(request) >>>>>>>>> File >>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>>>>>>> line 826, in __call__ >>>>>>>>> return _end_unary_response_blocking(state, call, False, None) >>>>>>>>> File >>>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>>>>>>> line 729, in _end_unary_response_blocking >>>>>>>>> raise _InactiveRpcError(state) >>>>>>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that >>>>>>>>> terminated with: >>>>>>>>> status = StatusCode.UNAVAILABLE >>>>>>>>> details = "Trying to connect an http1.x server" >>>>>>>>> debug_error_string = >>>>>>>>> "{"created":"@1587496938.983698000","description":"Error received >>>>>>>>> from peer >>>>>>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying >>>>>>>>> to connect an http1.x server","grpc_status":14}" >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best regards, >>>>>>> Piotr >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best regards, >>>>> Piotr >>>>> >>>> >>> >>> -- >>> Best regards, >>> Piotr >>> >> >> >> -- >> Best regards, >> Piotr >> >
