Here is an error I am getting when using DirectRunner:
DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for the
bundle bundle_1 to finish.
150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951
DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
Requests sent by runner: [('bundle_1', 1)]
DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
Requests multiplexing info: []
Traceback (most recent call last):
File
"/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
193, in _run_module_as_main
"__main__", mod_spec)
File
"/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
85, in _run_code
exec(code, run_globals)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
line 74, in <module>
run()
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
line 69, in run
| 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, in
__exit__
self.run().wait_until_finish()
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, in
run
return self.runner.run_pipeline(self, self._options)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
line 130, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 173, in run_pipeline
pipeline.to_runner_api(default_environment=self._default_environment))
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 183, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 331, in run_stages
bundle_context_manager,
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 508, in _run_stage
bundle_manager)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 546, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 927, in process_bundle
timer_inputs)):
File
"/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
line 586, in result_iterator
yield fs.pop().result()
File
"/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
line 432, in result
return self.__get_result()
File
"/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/concurrent/futures/_base.py",
line 384, in __get_result
raise self._exception
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/utils/thread_pool_executor.py",
line 44, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 923, in execute
dry_run)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 862, in process_bundle
raise RuntimeError(result.error)
RuntimeError: org.apache.beam.sdk.util.UserCodeException:
java.lang.NoClassDefFoundError:
org/springframework/expression/EvaluationContext
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
at
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505)
at
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
at
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
at
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
at
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)
at
org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError:
org/springframework/expression/EvaluationContext
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)
at
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466)
Caused by: java.lang.ClassNotFoundException:
org.springframework.expression.EvaluationContext
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)
at
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:466)
at
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:517)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
at
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:505)
at
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:735)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$500(FnApiDoFnRunner.java:122)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$3(FnApiDoFnRunner.java:198)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:708)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:122)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$2(FnApiDoFnRunner.java:194)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:945)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:122)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1411)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1406)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)
at
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
at
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:692)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:122)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:191)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
at
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)
at
org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:291)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
On Wed, Apr 22, 2020 at 2:22 PM Kyle Weaver <[email protected]> wrote:
> It should just work without any other changes. If it doesn't let us know.
>
> On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk <[email protected]>
> wrote:
>
>> Thx. Once the docker image is build, how do I make sure it is used when I
>> run Beam pipeline, as opposed to pulling from docker hub?
>>
>> On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]> wrote:
>>
>>> You can build the Java SDK image from source by running the following
>>> command: ./gradlew :sdks:java:container:docker
>>>
>>> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <[email protected]>
>>> wrote:
>>>
>>>> Thanks for quick response.
>>>>
>>>> Since Beam 2.21.0 is not yet available via pip
>>>> <https://pypi.org/project/apache-beam/#history>, I tried to run it
>>>> from HEAD. After creating fresh virtual environment, my steps were:
>>>> pip install -r build-requirements.txt
>>>> python setup.py build
>>>> python setup.py install
>>>>
>>>> Then I encounter the following error when running the script from the
>>>> original message:
>>>>
>>>> Error response from daemon: manifest for apache/beam_java_sdk:
>>>> 2.21.0.dev not found: manifest unknown: manifest unknown
>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable
>>>> to pull image apache/beam_java_sdk:2.21.0.dev
>>>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally
>>>> docker: Error response from daemon: manifest for apache/beam_java_sdk:
>>>> 2.21.0.dev not found: manifest unknown: manifest unknown.
>>>> See 'docker run --help'.
>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>> Requests sent by runner: []
>>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>>> Requests multiplexing info: []
>>>> Traceback (most recent call last):
>>>> File
>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>>> 193, in _run_module_as_main
>>>> "__main__", mod_spec)
>>>> File
>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>>> 85, in _run_code
>>>> exec(code, run_globals)
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>> line 74, in <module>
>>>> run()
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>>> line 69, in run
>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528,
>>>> in __exit__
>>>> self.run().wait_until_finish()
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514,
>>>> in run
>>>> return self.runner.run_pipeline(self, self._options)
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>>>> line 130, in run_pipeline
>>>> return runner.run_pipeline(pipeline, options)
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 173, in run_pipeline
>>>>
>>>> pipeline.to_runner_api(default_environment=self._default_environment))
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 183, in run_via_runner_api
>>>> return self.run_stages(stage_context, stages)
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 331, in run_stages
>>>> bundle_context_manager,
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>>> line 470, in _run_stage
>>>> bundle_context_manager.extract_bundle_inputs_and_outputs())
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>> line 530, in extract_bundle_inputs_and_outputs
>>>> data_api_service_descriptor = self.data_api_service_descriptor()
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>> line 451, in data_api_service_descriptor
>>>> return self.worker_handlers[0].data_api_service_descriptor()
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>>> line 445, in worker_handlers
>>>> self.stage.environment, self.num_workers))
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>>> line 854, in get_worker_handlers
>>>> worker_handler.start_worker()
>>>> File "/Users/piotr.filipiuk/src/
>>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>>> line 734, in start_worker
>>>> '--provision_endpoint=%s' % self.control_address,
>>>> File
>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>>> line 336, in check_output
>>>> **kwargs).stdout
>>>> File
>>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>>> line 418, in run
>>>> output=stdout, stderr=stderr)
>>>> subprocess.CalledProcessError: Command '['docker', 'run', '-d',
>>>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', '--id=worker_1',
>>>> '--logging_endpoint=host.docker.internal:50453',
>>>> '--control_endpoint=host.docker.internal:50450',
>>>> '--artifact_endpoint=host.docker.internal:50450',
>>>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero exit
>>>> status 125.
>>>>
>>>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly available, I
>>>> cannot pull the image. Is it possible to get access to dev images?
>>>> Alternatively, are there any instructions on how to build the beam_java_sdk
>>>> locally and then use the local image when running a job?
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I would like to know whether it is possible to run a streaming
>>>>>> pipeline that reads from (or writes to) Kafka using DirectRunner? If so,
>>>>>> what should the expansion_service point to:
>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90
>>>>>> ?
>>>>>
>>>>>
>>>>>> Also, when using FlinkRunner, what should be the value of
>>>>>> expansion_service? Based on
>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
>>>>>> Flink should start expansion service on port 8097. I am running Flink
>>>>>> using
>>>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution
>>>>>> and the expansion service is not being started. Same holds when using
>>>>>> docker image: https://hub.docker.com/_/flink. I cannot find
>>>>>> expansion service being mentioned in Flink Documentation - unless it is
>>>>>> called by different name.
>>>>>>
>>>>>
>>>>> After Beam 2.21.0 you should not have to specify anything. You only
>>>>> need to make sure that Java is installed in the system (so that command
>>>>> 'java' is available) and kafka.py will download the correct jar.
>>>>>
>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62
>>>>>
>>>>> For HEAD (today), as long as you are in a Beam repo clone, you should
>>>>> not have to specify anything as well since kafka.py will try to build the
>>>>> dependency locally.
>>>>>
>>>>> For previous versions, you have to start up an expansion service
>>>>> manually. I'm not familiar about the expansion service embedded in Flink
>>>>> Job Server.
>>>>>
>>>>>
>>>>>> Flink Version: flink-1.9.2
>>>>>> Apache-beam Version: 2.19.0
>>>>>>
>>>>>> Full Details:
>>>>>>
>>>>>> The pipeline looks as follows:
>>>>>>
>>>>>> ```
>>>>>> import argparse
>>>>>> import logging
>>>>>>
>>>>>> import apache_beam as beam
>>>>>> from apache_beam.io.external import kafka
>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>> from apache_beam.options.pipeline_options import SetupOptions
>>>>>> from apache_beam.options.pipeline_options import StandardOptions
>>>>>>
>>>>>>
>>>>>> def format_results(elem):
>>>>>> (msg, sum_value) = elem
>>>>>> print(type(msg))
>>>>>> print(type(sum_value))
>>>>>> return f"message: {msg}, sum: {sum_value}"
>>>>>>
>>>>>> def run(argv=None, save_main_session=True):
>>>>>> """Main entry point; defines and runs the pipeline."""
>>>>>> parser = argparse.ArgumentParser()
>>>>>>
>>>>>> parser.add_argument('--bootstrap_servers', type=str,
>>>>>> help='Kafka Broker address')
>>>>>> parser.add_argument('--topic', type=str, help='Kafka topic to
>>>>>> read from')
>>>>>> parser.add_argument('--output', type=str,
>>>>>> default="/tmp/kafka-output",
>>>>>> help='Output filepath')
>>>>>>
>>>>>> args, pipeline_args = parser.parse_known_args(argv)
>>>>>>
>>>>>> if args.topic is None or args.bootstrap_servers is None:
>>>>>> parser.print_usage()
>>>>>> print(sys.argv[0] + ': error: both --topic and
>>>>>> --bootstrap_servers are required')
>>>>>> sys.exit(1)
>>>>>>
>>>>>> options = PipelineOptions(pipeline_args)
>>>>>> # We use the save_main_session option because one or more DoFn's
>>>>>> in this
>>>>>> # workflow rely on global context (e.g., a module imported at
>>>>>> module level).
>>>>>> options.view_as(SetupOptions).save_main_session =
>>>>>> save_main_session
>>>>>>
>>>>>> # Enforce that this pipeline is always run in streaming mode
>>>>>> options.view_as(StandardOptions).streaming = True
>>>>>>
>>>>>> consumer_conf = {'bootstrap.servers': args.bootstrap_servers}
>>>>>>
>>>>>> with beam.Pipeline(options=options) as p:
>>>>>> events = (
>>>>>> p
>>>>>> | "ReadFromKafka" >> kafka.ReadFromKafka(
>>>>>> consumer_config=consumer_conf,
>>>>>> topics=[args.topic],
>>>>>> )
>>>>>> | "WindowIntoFixedWindows" >> beam.WindowInto(
>>>>>> beam.window.FixedWindows(60))
>>>>>> | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
>>>>>> | "SumByKey" >> beam.CombinePerKey(sum)
>>>>>> | 'FormatResults' >> beam.Map(format_results)
>>>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>> )
>>>>>>
>>>>>> if __name__ == '__main__':
>>>>>> logging.getLogger().setLevel(logging.DEBUG)
>>>>>> run()
>>>>>> ```
>>>>>>
>>>>>> I run it using:
>>>>>>
>>>>>> ```
>>>>>> python streaming.py --topic=inputs --bootstrap_servers==
>>>>>> 192.168.1.219:32779
>>>>>> ```
>>>>>>
>>>>>> Kafka brokers are available under the --bootstrap_servers and the
>>>>>> --topic exists.
>>>>>>
>>>>>> The error I am getting:
>>>>>>
>>>>>> ```
>>>>>> Traceback (most recent call last):
>>>>>> File "streaming.py", line 74, in <module>
>>>>>> run()
>>>>>> File "streaming.py", line 69, in run
>>>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>> File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>>> line 989, in __ror__
>>>>>> return self.transform.__ror__(pvalueish, self.label)
>>>>>> File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>>> line 549, in __ror__
>>>>>> result = p.apply(self, pvalueish, label)
>>>>>> File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>> line 536, in apply
>>>>>> return self.apply(transform, pvalueish)
>>>>>> File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>>> line 577, in apply
>>>>>> pvalueish_result = self.runner.apply(transform, pvalueish,
>>>>>> self._options)
>>>>>> File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>>> line 195, in apply
>>>>>> return m(transform, input, options)
>>>>>> File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>>> line 225, in apply_PTransform
>>>>>> return transform.expand(input)
>>>>>> File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
>>>>>> line 327, in expand
>>>>>> channel).Expand(request)
>>>>>> File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>> line 826, in __call__
>>>>>> return _end_unary_response_blocking(state, call, False, None)
>>>>>> File
>>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>>> line 729, in _end_unary_response_blocking
>>>>>> raise _InactiveRpcError(state)
>>>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
>>>>>> terminated with:
>>>>>> status = StatusCode.UNAVAILABLE
>>>>>> details = "Trying to connect an http1.x server"
>>>>>> debug_error_string =
>>>>>> "{"created":"@1587496938.983698000","description":"Error received from
>>>>>> peer
>>>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
>>>>>> to connect an http1.x server","grpc_status":14}"
>>>>>> ```
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>
>>
>> --
>> Best regards,
>> Piotr
>>
>
--
Best regards,
Piotr