Thanks for quick response.
Since Beam 2.21.0 is not yet available via pip
<https://pypi.org/project/apache-beam/#history>, I tried to run it from
HEAD. After creating fresh virtual environment, my steps were:
pip install -r build-requirements.txt
python setup.py build
python setup.py install
Then I encounter the following error when running the script from the
original message:
Error response from daemon: manifest for apache/beam_java_sdk:2.21.0.dev
not found: manifest unknown: manifest unknown
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable
to pull image apache/beam_java_sdk:2.21.0.dev
Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally
docker: Error response from daemon: manifest for apache/beam_java_sdk:
2.21.0.dev not found: manifest unknown: manifest unknown.
See 'docker run --help'.
DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
Requests sent by runner: []
DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
Requests multiplexing info: []
Traceback (most recent call last):
File
"/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
193, in _run_module_as_main
"__main__", mod_spec)
File
"/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
85, in _run_code
exec(code, run_globals)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
line 74, in <module>
run()
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
line 69, in run
| 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, in
__exit__
self.run().wait_until_finish()
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, in
run
return self.runner.run_pipeline(self, self._options)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
line 130, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 173, in run_pipeline
pipeline.to_runner_api(default_environment=self._default_environment))
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 183, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 331, in run_stages
bundle_context_manager,
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 470, in _run_stage
bundle_context_manager.extract_bundle_inputs_and_outputs())
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
line 530, in extract_bundle_inputs_and_outputs
data_api_service_descriptor = self.data_api_service_descriptor()
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
line 451, in data_api_service_descriptor
return self.worker_handlers[0].data_api_service_descriptor()
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
line 445, in worker_handlers
self.stage.environment, self.num_workers))
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
line 854, in get_worker_handlers
worker_handler.start_worker()
File "/Users/piotr.filipiuk/src/
github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
line 734, in start_worker
'--provision_endpoint=%s' % self.control_address,
File
"/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
line 336, in check_output
**kwargs).stdout
File
"/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
line 418, in run
output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['docker', 'run', '-d',
'--network=host', 'apache/beam_java_sdk:2.21.0.dev', '--id=worker_1',
'--logging_endpoint=host.docker.internal:50453',
'--control_endpoint=host.docker.internal:50450',
'--artifact_endpoint=host.docker.internal:50450',
'--provision_endpoint=host.docker.internal:50450']' returned non-zero exit
status 125.
Since the apache/beam_java_sdk:2.21.0.dev is not publicly available, I
cannot pull the image. Is it possible to get access to dev images?
Alternatively, are there any instructions on how to build the beam_java_sdk
locally and then use the local image when running a job?
Thanks
On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <[email protected]>
wrote:
>
>
> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <[email protected]>
> wrote:
>
>> Hi,
>>
>> I would like to know whether it is possible to run a streaming pipeline
>> that reads from (or writes to) Kafka using DirectRunner? If so, what should
>> the expansion_service point to:
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90
>> ?
>
>
>> Also, when using FlinkRunner, what should be the value of
>> expansion_service? Based on
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
>> Flink should start expansion service on port 8097. I am running Flink using
>> https://beam.apache.org/documentation/runners/flink/#streaming-execution
>> and the expansion service is not being started. Same holds when using
>> docker image: https://hub.docker.com/_/flink. I cannot find expansion
>> service being mentioned in Flink Documentation - unless it is called by
>> different name.
>>
>
> After Beam 2.21.0 you should not have to specify anything. You only need
> to make sure that Java is installed in the system (so that command 'java'
> is available) and kafka.py will download the correct jar.
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62
>
> For HEAD (today), as long as you are in a Beam repo clone, you should not
> have to specify anything as well since kafka.py will try to build the
> dependency locally.
>
> For previous versions, you have to start up an expansion service manually.
> I'm not familiar about the expansion service embedded in Flink Job Server.
>
>
>> Flink Version: flink-1.9.2
>> Apache-beam Version: 2.19.0
>>
>> Full Details:
>>
>> The pipeline looks as follows:
>>
>> ```
>> import argparse
>> import logging
>>
>> import apache_beam as beam
>> from apache_beam.io.external import kafka
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.options.pipeline_options import SetupOptions
>> from apache_beam.options.pipeline_options import StandardOptions
>>
>>
>> def format_results(elem):
>> (msg, sum_value) = elem
>> print(type(msg))
>> print(type(sum_value))
>> return f"message: {msg}, sum: {sum_value}"
>>
>> def run(argv=None, save_main_session=True):
>> """Main entry point; defines and runs the pipeline."""
>> parser = argparse.ArgumentParser()
>>
>> parser.add_argument('--bootstrap_servers', type=str,
>> help='Kafka Broker address')
>> parser.add_argument('--topic', type=str, help='Kafka topic to read
>> from')
>> parser.add_argument('--output', type=str, default="/tmp/kafka-output",
>> help='Output filepath')
>>
>> args, pipeline_args = parser.parse_known_args(argv)
>>
>> if args.topic is None or args.bootstrap_servers is None:
>> parser.print_usage()
>> print(sys.argv[0] + ': error: both --topic and
>> --bootstrap_servers are required')
>> sys.exit(1)
>>
>> options = PipelineOptions(pipeline_args)
>> # We use the save_main_session option because one or more DoFn's in
>> this
>> # workflow rely on global context (e.g., a module imported at module
>> level).
>> options.view_as(SetupOptions).save_main_session = save_main_session
>>
>> # Enforce that this pipeline is always run in streaming mode
>> options.view_as(StandardOptions).streaming = True
>>
>> consumer_conf = {'bootstrap.servers': args.bootstrap_servers}
>>
>> with beam.Pipeline(options=options) as p:
>> events = (
>> p
>> | "ReadFromKafka" >> kafka.ReadFromKafka(
>> consumer_config=consumer_conf,
>> topics=[args.topic],
>> )
>> | "WindowIntoFixedWindows" >> beam.WindowInto(
>> beam.window.FixedWindows(60))
>> | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
>> | "SumByKey" >> beam.CombinePerKey(sum)
>> | 'FormatResults' >> beam.Map(format_results)
>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>> )
>>
>> if __name__ == '__main__':
>> logging.getLogger().setLevel(logging.DEBUG)
>> run()
>> ```
>>
>> I run it using:
>>
>> ```
>> python streaming.py --topic=inputs --bootstrap_servers==
>> 192.168.1.219:32779
>> ```
>>
>> Kafka brokers are available under the --bootstrap_servers and the --topic
>> exists.
>>
>> The error I am getting:
>>
>> ```
>> Traceback (most recent call last):
>> File "streaming.py", line 74, in <module>
>> run()
>> File "streaming.py", line 69, in run
>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>> File
>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>> line 989, in __ror__
>> return self.transform.__ror__(pvalueish, self.label)
>> File
>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>> line 549, in __ror__
>> result = p.apply(self, pvalueish, label)
>> File
>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 536, in apply
>> return self.apply(transform, pvalueish)
>> File
>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 577, in apply
>> pvalueish_result = self.runner.apply(transform, pvalueish,
>> self._options)
>> File
>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>> line 195, in apply
>> return m(transform, input, options)
>> File
>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>> line 225, in apply_PTransform
>> return transform.expand(input)
>> File
>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
>> line 327, in expand
>> channel).Expand(request)
>> File
>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>> line 826, in __call__
>> return _end_unary_response_blocking(state, call, False, None)
>> File
>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>> line 729, in _end_unary_response_blocking
>> raise _InactiveRpcError(state)
>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
>> terminated with:
>> status = StatusCode.UNAVAILABLE
>> details = "Trying to connect an http1.x server"
>> debug_error_string =
>> "{"created":"@1587496938.983698000","description":"Error received from peer
>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
>> to connect an http1.x server","grpc_status":14}"
>> ```
>>
>> Thanks
>>
>>
--
Best regards,
Piotr