Thx. Once the docker image is build, how do I make sure it is used when I
run Beam pipeline, as opposed to pulling from docker hub?

On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]> wrote:

> You can build the Java SDK image from source by running the following
> command: ./gradlew :sdks:java:container:docker
>
> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <[email protected]>
> wrote:
>
>> Thanks for quick response.
>>
>> Since Beam 2.21.0 is not yet available via pip
>> <https://pypi.org/project/apache-beam/#history>, I tried to run it from
>> HEAD. After creating fresh virtual environment, my steps were:
>> pip install -r build-requirements.txt
>> python setup.py build
>> python setup.py install
>>
>> Then I encounter the following error when running the script from the
>> original message:
>>
>> Error response from daemon: manifest for apache/beam_java_sdk:2.21.0.dev
>> not found: manifest unknown: manifest unknown
>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable
>> to pull image apache/beam_java_sdk:2.21.0.dev
>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally
>> docker: Error response from daemon: manifest for apache/beam_java_sdk:
>> 2.21.0.dev not found: manifest unknown: manifest unknown.
>> See 'docker run --help'.
>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>> Requests sent by runner: []
>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>> Requests multiplexing info: []
>> Traceback (most recent call last):
>>   File
>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>> 193, in _run_module_as_main
>>     "__main__", mod_spec)
>>   File
>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>> 85, in _run_code
>>     exec(code, run_globals)
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>> line 74, in <module>
>>     run()
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>> line 69, in run
>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528,
>> in __exit__
>>     self.run().wait_until_finish()
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514,
>> in run
>>     return self.runner.run_pipeline(self, self._options)
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>> line 130, in run_pipeline
>>     return runner.run_pipeline(pipeline, options)
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>> line 173, in run_pipeline
>>     pipeline.to_runner_api(default_environment=self._default_environment))
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>> line 183, in run_via_runner_api
>>     return self.run_stages(stage_context, stages)
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>> line 331, in run_stages
>>     bundle_context_manager,
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>> line 470, in _run_stage
>>     bundle_context_manager.extract_bundle_inputs_and_outputs())
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>> line 530, in extract_bundle_inputs_and_outputs
>>     data_api_service_descriptor = self.data_api_service_descriptor()
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>> line 451, in data_api_service_descriptor
>>     return self.worker_handlers[0].data_api_service_descriptor()
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>> line 445, in worker_handlers
>>     self.stage.environment, self.num_workers))
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>> line 854, in get_worker_handlers
>>     worker_handler.start_worker()
>>   File "/Users/piotr.filipiuk/src/
>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>> line 734, in start_worker
>>     '--provision_endpoint=%s' % self.control_address,
>>   File
>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>> line 336, in check_output
>>     **kwargs).stdout
>>   File
>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>> line 418, in run
>>     output=stdout, stderr=stderr)
>> subprocess.CalledProcessError: Command '['docker', 'run', '-d',
>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', '--id=worker_1',
>> '--logging_endpoint=host.docker.internal:50453',
>> '--control_endpoint=host.docker.internal:50450',
>> '--artifact_endpoint=host.docker.internal:50450',
>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero exit
>> status 125.
>>
>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly available, I
>> cannot pull the image. Is it possible to get access to dev images?
>> Alternatively, are there any instructions on how to build the beam_java_sdk
>> locally and then use the local image when running a job?
>>
>> Thanks
>>
>>
>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>>
>>>
>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> I would like to know whether it is possible to run a streaming pipeline
>>>> that reads from (or writes to) Kafka using DirectRunner? If so, what should
>>>> the expansion_service point to:
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90
>>>> ?
>>>
>>>
>>>> Also, when using FlinkRunner, what should be the value of
>>>> expansion_service? Based on
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
>>>> Flink should start expansion service on port 8097. I am running Flink using
>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution
>>>> and the expansion service is not being started. Same holds when using
>>>> docker image: https://hub.docker.com/_/flink. I cannot find expansion
>>>> service being mentioned in Flink Documentation - unless it is called by
>>>> different name.
>>>>
>>>
>>> After Beam 2.21.0 you should not have to specify anything. You only need
>>> to make sure that Java is installed in the system (so that command 'java'
>>> is available) and kafka.py will download the correct jar.
>>>
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62
>>>
>>> For HEAD (today), as long as you are in a Beam repo clone, you should
>>> not have to specify anything as well since kafka.py will try to build the
>>> dependency locally.
>>>
>>> For previous versions, you have to start up an expansion service
>>> manually. I'm not familiar about the expansion service embedded in Flink
>>> Job Server.
>>>
>>>
>>>> Flink Version: flink-1.9.2
>>>> Apache-beam Version: 2.19.0
>>>>
>>>> Full Details:
>>>>
>>>> The pipeline looks as follows:
>>>>
>>>> ```
>>>> import argparse
>>>> import logging
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.io.external import kafka
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from apache_beam.options.pipeline_options import SetupOptions
>>>> from apache_beam.options.pipeline_options import StandardOptions
>>>>
>>>>
>>>> def format_results(elem):
>>>>     (msg, sum_value) = elem
>>>>     print(type(msg))
>>>>     print(type(sum_value))
>>>>     return f"message: {msg}, sum: {sum_value}"
>>>>
>>>> def run(argv=None, save_main_session=True):
>>>>     """Main entry point; defines and runs the pipeline."""
>>>>     parser = argparse.ArgumentParser()
>>>>
>>>>     parser.add_argument('--bootstrap_servers', type=str,
>>>>                         help='Kafka Broker address')
>>>>     parser.add_argument('--topic', type=str, help='Kafka topic to read
>>>> from')
>>>>     parser.add_argument('--output', type=str,
>>>> default="/tmp/kafka-output",
>>>>                         help='Output filepath')
>>>>
>>>>     args, pipeline_args = parser.parse_known_args(argv)
>>>>
>>>>     if args.topic is None or args.bootstrap_servers is None:
>>>>         parser.print_usage()
>>>>         print(sys.argv[0] + ': error: both --topic and
>>>> --bootstrap_servers are required')
>>>>         sys.exit(1)
>>>>
>>>>     options = PipelineOptions(pipeline_args)
>>>>     # We use the save_main_session option because one or more DoFn's in
>>>> this
>>>>     # workflow rely on global context (e.g., a module imported at
>>>> module level).
>>>>     options.view_as(SetupOptions).save_main_session = save_main_session
>>>>
>>>>     # Enforce that this pipeline is always run in streaming mode
>>>>     options.view_as(StandardOptions).streaming = True
>>>>
>>>>     consumer_conf = {'bootstrap.servers': args.bootstrap_servers}
>>>>
>>>>     with beam.Pipeline(options=options) as p:
>>>>         events = (
>>>>             p
>>>>             | "ReadFromKafka" >> kafka.ReadFromKafka(
>>>>                 consumer_config=consumer_conf,
>>>>                 topics=[args.topic],
>>>>             )
>>>>             | "WindowIntoFixedWindows" >> beam.WindowInto(
>>>>                 beam.window.FixedWindows(60))
>>>>             | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
>>>>             | "SumByKey" >> beam.CombinePerKey(sum)
>>>>             | 'FormatResults' >> beam.Map(format_results)
>>>>             | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>         )
>>>>
>>>> if __name__ == '__main__':
>>>>     logging.getLogger().setLevel(logging.DEBUG)
>>>>     run()
>>>> ```
>>>>
>>>> I run it using:
>>>>
>>>> ```
>>>> python streaming.py --topic=inputs --bootstrap_servers==
>>>> 192.168.1.219:32779
>>>> ```
>>>>
>>>> Kafka brokers are available under the --bootstrap_servers and the
>>>> --topic exists.
>>>>
>>>> The error I am getting:
>>>>
>>>> ```
>>>> Traceback (most recent call last):
>>>>   File "streaming.py", line 74, in <module>
>>>>     run()
>>>>   File "streaming.py", line 69, in run
>>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>   File
>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>> line 989, in __ror__
>>>>     return self.transform.__ror__(pvalueish, self.label)
>>>>   File
>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>> line 549, in __ror__
>>>>     result = p.apply(self, pvalueish, label)
>>>>   File
>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>> line 536, in apply
>>>>     return self.apply(transform, pvalueish)
>>>>   File
>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>> line 577, in apply
>>>>     pvalueish_result = self.runner.apply(transform, pvalueish,
>>>> self._options)
>>>>   File
>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>> line 195, in apply
>>>>     return m(transform, input, options)
>>>>   File
>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>> line 225, in apply_PTransform
>>>>     return transform.expand(input)
>>>>   File
>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
>>>> line 327, in expand
>>>>     channel).Expand(request)
>>>>   File
>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>> line 826, in __call__
>>>>     return _end_unary_response_blocking(state, call, False, None)
>>>>   File
>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>> line 729, in _end_unary_response_blocking
>>>>     raise _InactiveRpcError(state)
>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
>>>> terminated with:
>>>>         status = StatusCode.UNAVAILABLE
>>>>         details = "Trying to connect an http1.x server"
>>>>         debug_error_string =
>>>> "{"created":"@1587496938.983698000","description":"Error received from peer
>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
>>>> to connect an http1.x server","grpc_status":14}"
>>>> ```
>>>>
>>>> Thanks
>>>>
>>>>
>>
>> --
>> Best regards,
>> Piotr
>>
>

-- 
Best regards,
Piotr

Reply via email to