You can build the Java SDK image from source by running the following
command: ./gradlew :sdks:java:container:docker

On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <[email protected]>
wrote:

> Thanks for quick response.
>
> Since Beam 2.21.0 is not yet available via pip
> <https://pypi.org/project/apache-beam/#history>, I tried to run it from
> HEAD. After creating fresh virtual environment, my steps were:
> pip install -r build-requirements.txt
> python setup.py build
> python setup.py install
>
> Then I encounter the following error when running the script from the
> original message:
>
> Error response from daemon: manifest for apache/beam_java_sdk:2.21.0.dev
> not found: manifest unknown: manifest unknown
> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable
> to pull image apache/beam_java_sdk:2.21.0.dev
> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally
> docker: Error response from daemon: manifest for apache/beam_java_sdk:
> 2.21.0.dev not found: manifest unknown: manifest unknown.
> See 'docker run --help'.
> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
> Requests sent by runner: []
> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
> Requests multiplexing info: []
> Traceback (most recent call last):
>   File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
> 193, in _run_module_as_main
>     "__main__", mod_spec)
>   File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
> 85, in _run_code
>     exec(code, run_globals)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
> line 74, in <module>
>     run()
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
> line 69, in run
>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, in
> __exit__
>     self.run().wait_until_finish()
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, in
> run
>     return self.runner.run_pipeline(self, self._options)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
> line 130, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 173, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 183, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 331, in run_stages
>     bundle_context_manager,
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 470, in _run_stage
>     bundle_context_manager.extract_bundle_inputs_and_outputs())
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
> line 530, in extract_bundle_inputs_and_outputs
>     data_api_service_descriptor = self.data_api_service_descriptor()
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
> line 451, in data_api_service_descriptor
>     return self.worker_handlers[0].data_api_service_descriptor()
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
> line 445, in worker_handlers
>     self.stage.environment, self.num_workers))
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
> line 854, in get_worker_handlers
>     worker_handler.start_worker()
>   File "/Users/piotr.filipiuk/src/
> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
> line 734, in start_worker
>     '--provision_endpoint=%s' % self.control_address,
>   File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
> line 336, in check_output
>     **kwargs).stdout
>   File
> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
> line 418, in run
>     output=stdout, stderr=stderr)
> subprocess.CalledProcessError: Command '['docker', 'run', '-d',
> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', '--id=worker_1',
> '--logging_endpoint=host.docker.internal:50453',
> '--control_endpoint=host.docker.internal:50450',
> '--artifact_endpoint=host.docker.internal:50450',
> '--provision_endpoint=host.docker.internal:50450']' returned non-zero exit
> status 125.
>
> Since the apache/beam_java_sdk:2.21.0.dev is not publicly available, I
> cannot pull the image. Is it possible to get access to dev images?
> Alternatively, are there any instructions on how to build the beam_java_sdk
> locally and then use the local image when running a job?
>
> Thanks
>
>
> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>>
>>
>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> I would like to know whether it is possible to run a streaming pipeline
>>> that reads from (or writes to) Kafka using DirectRunner? If so, what should
>>> the expansion_service point to:
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90
>>> ?
>>
>>
>>> Also, when using FlinkRunner, what should be the value of
>>> expansion_service? Based on
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
>>> Flink should start expansion service on port 8097. I am running Flink using
>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution
>>> and the expansion service is not being started. Same holds when using
>>> docker image: https://hub.docker.com/_/flink. I cannot find expansion
>>> service being mentioned in Flink Documentation - unless it is called by
>>> different name.
>>>
>>
>> After Beam 2.21.0 you should not have to specify anything. You only need
>> to make sure that Java is installed in the system (so that command 'java'
>> is available) and kafka.py will download the correct jar.
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62
>>
>> For HEAD (today), as long as you are in a Beam repo clone, you should not
>> have to specify anything as well since kafka.py will try to build the
>> dependency locally.
>>
>> For previous versions, you have to start up an expansion service
>> manually. I'm not familiar about the expansion service embedded in Flink
>> Job Server.
>>
>>
>>> Flink Version: flink-1.9.2
>>> Apache-beam Version: 2.19.0
>>>
>>> Full Details:
>>>
>>> The pipeline looks as follows:
>>>
>>> ```
>>> import argparse
>>> import logging
>>>
>>> import apache_beam as beam
>>> from apache_beam.io.external import kafka
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>> from apache_beam.options.pipeline_options import SetupOptions
>>> from apache_beam.options.pipeline_options import StandardOptions
>>>
>>>
>>> def format_results(elem):
>>>     (msg, sum_value) = elem
>>>     print(type(msg))
>>>     print(type(sum_value))
>>>     return f"message: {msg}, sum: {sum_value}"
>>>
>>> def run(argv=None, save_main_session=True):
>>>     """Main entry point; defines and runs the pipeline."""
>>>     parser = argparse.ArgumentParser()
>>>
>>>     parser.add_argument('--bootstrap_servers', type=str,
>>>                         help='Kafka Broker address')
>>>     parser.add_argument('--topic', type=str, help='Kafka topic to read
>>> from')
>>>     parser.add_argument('--output', type=str,
>>> default="/tmp/kafka-output",
>>>                         help='Output filepath')
>>>
>>>     args, pipeline_args = parser.parse_known_args(argv)
>>>
>>>     if args.topic is None or args.bootstrap_servers is None:
>>>         parser.print_usage()
>>>         print(sys.argv[0] + ': error: both --topic and
>>> --bootstrap_servers are required')
>>>         sys.exit(1)
>>>
>>>     options = PipelineOptions(pipeline_args)
>>>     # We use the save_main_session option because one or more DoFn's in
>>> this
>>>     # workflow rely on global context (e.g., a module imported at module
>>> level).
>>>     options.view_as(SetupOptions).save_main_session = save_main_session
>>>
>>>     # Enforce that this pipeline is always run in streaming mode
>>>     options.view_as(StandardOptions).streaming = True
>>>
>>>     consumer_conf = {'bootstrap.servers': args.bootstrap_servers}
>>>
>>>     with beam.Pipeline(options=options) as p:
>>>         events = (
>>>             p
>>>             | "ReadFromKafka" >> kafka.ReadFromKafka(
>>>                 consumer_config=consumer_conf,
>>>                 topics=[args.topic],
>>>             )
>>>             | "WindowIntoFixedWindows" >> beam.WindowInto(
>>>                 beam.window.FixedWindows(60))
>>>             | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
>>>             | "SumByKey" >> beam.CombinePerKey(sum)
>>>             | 'FormatResults' >> beam.Map(format_results)
>>>             | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>         )
>>>
>>> if __name__ == '__main__':
>>>     logging.getLogger().setLevel(logging.DEBUG)
>>>     run()
>>> ```
>>>
>>> I run it using:
>>>
>>> ```
>>> python streaming.py --topic=inputs --bootstrap_servers==
>>> 192.168.1.219:32779
>>> ```
>>>
>>> Kafka brokers are available under the --bootstrap_servers and the
>>> --topic exists.
>>>
>>> The error I am getting:
>>>
>>> ```
>>> Traceback (most recent call last):
>>>   File "streaming.py", line 74, in <module>
>>>     run()
>>>   File "streaming.py", line 69, in run
>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>   File
>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>> line 989, in __ror__
>>>     return self.transform.__ror__(pvalueish, self.label)
>>>   File
>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>> line 549, in __ror__
>>>     result = p.apply(self, pvalueish, label)
>>>   File
>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 536, in apply
>>>     return self.apply(transform, pvalueish)
>>>   File
>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 577, in apply
>>>     pvalueish_result = self.runner.apply(transform, pvalueish,
>>> self._options)
>>>   File
>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>> line 195, in apply
>>>     return m(transform, input, options)
>>>   File
>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>> line 225, in apply_PTransform
>>>     return transform.expand(input)
>>>   File
>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
>>> line 327, in expand
>>>     channel).Expand(request)
>>>   File
>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>> line 826, in __call__
>>>     return _end_unary_response_blocking(state, call, False, None)
>>>   File
>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>> line 729, in _end_unary_response_blocking
>>>     raise _InactiveRpcError(state)
>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
>>> terminated with:
>>>         status = StatusCode.UNAVAILABLE
>>>         details = "Trying to connect an http1.x server"
>>>         debug_error_string =
>>> "{"created":"@1587496938.983698000","description":"Error received from peer
>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
>>> to connect an http1.x server","grpc_status":14}"
>>> ```
>>>
>>> Thanks
>>>
>>>
>
> --
> Best regards,
> Piotr
>

Reply via email to