I'm not sure about the org.springframework.expression.EvaluationContext issue, but "local class incompatible" usually happens when using Beam components built from different sources. Make sure to rebuild everything from the same commit.
On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk <[email protected]> wrote: > After syncing to: > > commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master, > origin/master, origin/HEAD) > Author: Ning Kang <[email protected]> > Date: Fri Apr 24 10:58:07 2020 -0700 > > The new error is: > > RuntimeError: java.lang.IllegalArgumentException: unable to deserialize > Custom DoFn With Execution Info > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697) > at > org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.<init>(FnApiDoFnRunner.java:356) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266) > at > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) > at > org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.InvalidClassException: > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class > incompatible: stream classdesc serialVersionUID = 7311199418509482705, > local class serialVersionUID = 5488866827627794770 > at > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71) > ... 18 more > > I am not sure it is related to > https://issues.apache.org/jira/browse/BEAM-9745. > > On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk <[email protected]> > wrote: > >> Here is an error I am getting when using DirectRunner: >> >> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for >> the bundle bundle_1 to finish. >> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951 >> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >> Requests sent by runner: [('bundle_1', 1)] >> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >> Requests multiplexing info: [] >> Traceback (most recent call last): >> File >> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line >> 193, in _run_module_as_main >> "__main__", mod_spec) >> File >> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line >> 85, in _run_code >> exec(code, run_globals) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >> line 74, in <module> >> run() >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >> line 69, in run >> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, >> in __exit__ >> self.run().wait_until_finish() >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, >> in run >> return self.runner.run_pipeline(self, self._options) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", >> line 130, in run_pipeline >> return runner.run_pipeline(pipeline, options) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 173, in run_pipeline >> pipeline.to_runner_api(default_environment=self._default_environment)) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 183, in run_via_runner_api >> return self.run_stages(stage_context, stages) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 331, in run_stages >> bundle_context_manager, >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 508, in _run_stage >> bundle_manager) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 546, in _run_bundle >> data_input, data_output, input_timers, expected_timer_output) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 927, in process_bundle >> timer_inputs)): >> File >> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py", >> line 586, in result_iterator >> yield fs.pop().result() >> File >> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py", >> line 432, in result >> return self.__get_result() >> File >> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py", >> line 384, in __get_result >> raise self._exception >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/utils/thread_pool_executor.py", >> line 44, in run >> self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 923, in execute >> dry_run) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 862, in process_bundle >> raise RuntimeError(result.error) >> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >> java.lang.NoClassDefFoundError: >> org/springframework/expression/EvaluationContext >> at >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) >> at >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >> Source) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >> at >> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >> at >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505) >> at >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >> Source) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >> at >> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >> at >> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) >> at >> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >> at >> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177) >> at >> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.NoClassDefFoundError: >> org/springframework/expression/EvaluationContext >> at >> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478) >> at >> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121) >> at >> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43) >> at >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466) >> Caused by: java.lang.ClassNotFoundException: >> org.springframework.expression.EvaluationContext >> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >> at >> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478) >> at >> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121) >> at >> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43) >> at >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466) >> at >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >> Source) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >> at >> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >> at >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505) >> at >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >> Source) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406) >> at >> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75) >> at >> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) >> at >> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) >> at >> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) >> at >> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177) >> at >> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> >> On Wed, Apr 22, 2020 at 2:22 PM Kyle Weaver <[email protected]> wrote: >> >>> It should just work without any other changes. If it doesn't let us know. >>> >>> On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk <[email protected]> >>> wrote: >>> >>>> Thx. Once the docker image is build, how do I make sure it is used when >>>> I run Beam pipeline, as opposed to pulling from docker hub? >>>> >>>> On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]> >>>> wrote: >>>> >>>>> You can build the Java SDK image from source by running the following >>>>> command: ./gradlew :sdks:java:container:docker >>>>> >>>>> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk < >>>>> [email protected]> wrote: >>>>> >>>>>> Thanks for quick response. >>>>>> >>>>>> Since Beam 2.21.0 is not yet available via pip >>>>>> <https://pypi.org/project/apache-beam/#history>, I tried to run it >>>>>> from HEAD. After creating fresh virtual environment, my steps were: >>>>>> pip install -r build-requirements.txt >>>>>> python setup.py build >>>>>> python setup.py install >>>>>> >>>>>> Then I encounter the following error when running the script from the >>>>>> original message: >>>>>> >>>>>> Error response from daemon: manifest for apache/beam_java_sdk: >>>>>> 2.21.0.dev not found: manifest unknown: manifest unknown >>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable >>>>>> to pull image apache/beam_java_sdk:2.21.0.dev >>>>>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally >>>>>> docker: Error response from daemon: manifest for apache/beam_java_sdk: >>>>>> 2.21.0.dev not found: manifest unknown: manifest unknown. >>>>>> See 'docker run --help'. >>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>>>>> Requests sent by runner: [] >>>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>>>>> Requests multiplexing info: [] >>>>>> Traceback (most recent call last): >>>>>> File >>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", >>>>>> line >>>>>> 193, in _run_module_as_main >>>>>> "__main__", mod_spec) >>>>>> File >>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", >>>>>> line >>>>>> 85, in _run_code >>>>>> exec(code, run_globals) >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>>>>> line 74, in <module> >>>>>> run() >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>>>>> line 69, in run >>>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line >>>>>> 528, in __exit__ >>>>>> self.run().wait_until_finish() >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line >>>>>> 514, in run >>>>>> return self.runner.run_pipeline(self, self._options) >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", >>>>>> line 130, in run_pipeline >>>>>> return runner.run_pipeline(pipeline, options) >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>> line 173, in run_pipeline >>>>>> >>>>>> pipeline.to_runner_api(default_environment=self._default_environment)) >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>> line 183, in run_via_runner_api >>>>>> return self.run_stages(stage_context, stages) >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>> line 331, in run_stages >>>>>> bundle_context_manager, >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>>>>> line 470, in _run_stage >>>>>> bundle_context_manager.extract_bundle_inputs_and_outputs()) >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>>>>> line 530, in extract_bundle_inputs_and_outputs >>>>>> data_api_service_descriptor = self.data_api_service_descriptor() >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>>>>> line 451, in data_api_service_descriptor >>>>>> return self.worker_handlers[0].data_api_service_descriptor() >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>>>>> line 445, in worker_handlers >>>>>> self.stage.environment, self.num_workers)) >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", >>>>>> line 854, in get_worker_handlers >>>>>> worker_handler.start_worker() >>>>>> File "/Users/piotr.filipiuk/src/ >>>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", >>>>>> line 734, in start_worker >>>>>> '--provision_endpoint=%s' % self.control_address, >>>>>> File >>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", >>>>>> line 336, in check_output >>>>>> **kwargs).stdout >>>>>> File >>>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", >>>>>> line 418, in run >>>>>> output=stdout, stderr=stderr) >>>>>> subprocess.CalledProcessError: Command '['docker', 'run', '-d', >>>>>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', >>>>>> '--id=worker_1', '--logging_endpoint=host.docker.internal:50453', >>>>>> '--control_endpoint=host.docker.internal:50450', >>>>>> '--artifact_endpoint=host.docker.internal:50450', >>>>>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero >>>>>> exit >>>>>> status 125. >>>>>> >>>>>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly available, >>>>>> I cannot pull the image. Is it possible to get access to dev images? >>>>>> Alternatively, are there any instructions on how to build the >>>>>> beam_java_sdk >>>>>> locally and then use the local image when running a job? >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I would like to know whether it is possible to run a streaming >>>>>>>> pipeline that reads from (or writes to) Kafka using DirectRunner? If >>>>>>>> so, >>>>>>>> what should the expansion_service point to: >>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90 >>>>>>>> ? >>>>>>> >>>>>>> >>>>>>>> Also, when using FlinkRunner, what should be the value of >>>>>>>> expansion_service? Based on >>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24 >>>>>>>> Flink should start expansion service on port 8097. I am running Flink >>>>>>>> using >>>>>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution >>>>>>>> and the expansion service is not being started. Same holds when using >>>>>>>> docker image: https://hub.docker.com/_/flink. I cannot find >>>>>>>> expansion service being mentioned in Flink Documentation - unless it is >>>>>>>> called by different name. >>>>>>>> >>>>>>> >>>>>>> After Beam 2.21.0 you should not have to specify anything. You only >>>>>>> need to make sure that Java is installed in the system (so that command >>>>>>> 'java' is available) and kafka.py will download the correct jar. >>>>>>> >>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62 >>>>>>> >>>>>>> For HEAD (today), as long as you are in a Beam repo clone, you >>>>>>> should not have to specify anything as well since kafka.py will try to >>>>>>> build the dependency locally. >>>>>>> >>>>>>> For previous versions, you have to start up an expansion service >>>>>>> manually. I'm not familiar about the expansion service embedded in Flink >>>>>>> Job Server. >>>>>>> >>>>>>> >>>>>>>> Flink Version: flink-1.9.2 >>>>>>>> Apache-beam Version: 2.19.0 >>>>>>>> >>>>>>>> Full Details: >>>>>>>> >>>>>>>> The pipeline looks as follows: >>>>>>>> >>>>>>>> ``` >>>>>>>> import argparse >>>>>>>> import logging >>>>>>>> >>>>>>>> import apache_beam as beam >>>>>>>> from apache_beam.io.external import kafka >>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>>> from apache_beam.options.pipeline_options import SetupOptions >>>>>>>> from apache_beam.options.pipeline_options import StandardOptions >>>>>>>> >>>>>>>> >>>>>>>> def format_results(elem): >>>>>>>> (msg, sum_value) = elem >>>>>>>> print(type(msg)) >>>>>>>> print(type(sum_value)) >>>>>>>> return f"message: {msg}, sum: {sum_value}" >>>>>>>> >>>>>>>> def run(argv=None, save_main_session=True): >>>>>>>> """Main entry point; defines and runs the pipeline.""" >>>>>>>> parser = argparse.ArgumentParser() >>>>>>>> >>>>>>>> parser.add_argument('--bootstrap_servers', type=str, >>>>>>>> help='Kafka Broker address') >>>>>>>> parser.add_argument('--topic', type=str, help='Kafka topic to >>>>>>>> read from') >>>>>>>> parser.add_argument('--output', type=str, >>>>>>>> default="/tmp/kafka-output", >>>>>>>> help='Output filepath') >>>>>>>> >>>>>>>> args, pipeline_args = parser.parse_known_args(argv) >>>>>>>> >>>>>>>> if args.topic is None or args.bootstrap_servers is None: >>>>>>>> parser.print_usage() >>>>>>>> print(sys.argv[0] + ': error: both --topic and >>>>>>>> --bootstrap_servers are required') >>>>>>>> sys.exit(1) >>>>>>>> >>>>>>>> options = PipelineOptions(pipeline_args) >>>>>>>> # We use the save_main_session option because one or more >>>>>>>> DoFn's in this >>>>>>>> # workflow rely on global context (e.g., a module imported at >>>>>>>> module level). >>>>>>>> options.view_as(SetupOptions).save_main_session = >>>>>>>> save_main_session >>>>>>>> >>>>>>>> # Enforce that this pipeline is always run in streaming mode >>>>>>>> options.view_as(StandardOptions).streaming = True >>>>>>>> >>>>>>>> consumer_conf = {'bootstrap.servers': args.bootstrap_servers} >>>>>>>> >>>>>>>> with beam.Pipeline(options=options) as p: >>>>>>>> events = ( >>>>>>>> p >>>>>>>> | "ReadFromKafka" >> kafka.ReadFromKafka( >>>>>>>> consumer_config=consumer_conf, >>>>>>>> topics=[args.topic], >>>>>>>> ) >>>>>>>> | "WindowIntoFixedWindows" >> beam.WindowInto( >>>>>>>> beam.window.FixedWindows(60)) >>>>>>>> | "AddOnes" >> beam.Map(lambda msg: (msg, 1)) >>>>>>>> | "SumByKey" >> beam.CombinePerKey(sum) >>>>>>>> | 'FormatResults' >> beam.Map(format_results) >>>>>>>> | 'WriteUserScoreSums' >> >>>>>>>> beam.io.WriteToText(args.output) >>>>>>>> ) >>>>>>>> >>>>>>>> if __name__ == '__main__': >>>>>>>> logging.getLogger().setLevel(logging.DEBUG) >>>>>>>> run() >>>>>>>> ``` >>>>>>>> >>>>>>>> I run it using: >>>>>>>> >>>>>>>> ``` >>>>>>>> python streaming.py --topic=inputs --bootstrap_servers== >>>>>>>> 192.168.1.219:32779 >>>>>>>> ``` >>>>>>>> >>>>>>>> Kafka brokers are available under the --bootstrap_servers and the >>>>>>>> --topic exists. >>>>>>>> >>>>>>>> The error I am getting: >>>>>>>> >>>>>>>> ``` >>>>>>>> Traceback (most recent call last): >>>>>>>> File "streaming.py", line 74, in <module> >>>>>>>> run() >>>>>>>> File "streaming.py", line 69, in run >>>>>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>>>>>> line 989, in __ror__ >>>>>>>> return self.transform.__ror__(pvalueish, self.label) >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>>>>>> line 549, in __ror__ >>>>>>>> result = p.apply(self, pvalueish, label) >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>>>>> line 536, in apply >>>>>>>> return self.apply(transform, pvalueish) >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>>>>> line 577, in apply >>>>>>>> pvalueish_result = self.runner.apply(transform, pvalueish, >>>>>>>> self._options) >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>>>>>>> line 195, in apply >>>>>>>> return m(transform, input, options) >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>>>>>>> line 225, in apply_PTransform >>>>>>>> return transform.expand(input) >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py", >>>>>>>> line 327, in expand >>>>>>>> channel).Expand(request) >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>>>>>> line 826, in __call__ >>>>>>>> return _end_unary_response_blocking(state, call, False, None) >>>>>>>> File >>>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>>>>>> line 729, in _end_unary_response_blocking >>>>>>>> raise _InactiveRpcError(state) >>>>>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that >>>>>>>> terminated with: >>>>>>>> status = StatusCode.UNAVAILABLE >>>>>>>> details = "Trying to connect an http1.x server" >>>>>>>> debug_error_string = >>>>>>>> "{"created":"@1587496938.983698000","description":"Error received from >>>>>>>> peer >>>>>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying >>>>>>>> to connect an http1.x server","grpc_status":14}" >>>>>>>> ``` >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> Best regards, >>>>>> Piotr >>>>>> >>>>> >>>> >>>> -- >>>> Best regards, >>>> Piotr >>>> >>> >> >> -- >> Best regards, >> Piotr >> > > > -- > Best regards, > Piotr >
