It should just work without any other changes. If it doesn't let us know.

On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk <[email protected]>
wrote:

> Thx. Once the docker image is build, how do I make sure it is used when I
> run Beam pipeline, as opposed to pulling from docker hub?
>
> On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]> wrote:
>
>> You can build the Java SDK image from source by running the following
>> command: ./gradlew :sdks:java:container:docker
>>
>> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> Thanks for quick response.
>>>
>>> Since Beam 2.21.0 is not yet available via pip
>>> <https://pypi.org/project/apache-beam/#history>, I tried to run it from
>>> HEAD. After creating fresh virtual environment, my steps were:
>>> pip install -r build-requirements.txt
>>> python setup.py build
>>> python setup.py install
>>>
>>> Then I encounter the following error when running the script from the
>>> original message:
>>>
>>> Error response from daemon: manifest for apache/beam_java_sdk:2.21.0.dev
>>> not found: manifest unknown: manifest unknown
>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable
>>> to pull image apache/beam_java_sdk:2.21.0.dev
>>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally
>>> docker: Error response from daemon: manifest for apache/beam_java_sdk:
>>> 2.21.0.dev not found: manifest unknown: manifest unknown.
>>> See 'docker run --help'.
>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>> Requests sent by runner: []
>>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>>> Requests multiplexing info: []
>>> Traceback (most recent call last):
>>>   File
>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>> 193, in _run_module_as_main
>>>     "__main__", mod_spec)
>>>   File
>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>>> 85, in _run_code
>>>     exec(code, run_globals)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>> line 74, in <module>
>>>     run()
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py",
>>> line 69, in run
>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528,
>>> in __exit__
>>>     self.run().wait_until_finish()
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514,
>>> in run
>>>     return self.runner.run_pipeline(self, self._options)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>>> line 130, in run_pipeline
>>>     return runner.run_pipeline(pipeline, options)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 173, in run_pipeline
>>>
>>> pipeline.to_runner_api(default_environment=self._default_environment))
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 183, in run_via_runner_api
>>>     return self.run_stages(stage_context, stages)
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 331, in run_stages
>>>     bundle_context_manager,
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>>> line 470, in _run_stage
>>>     bundle_context_manager.extract_bundle_inputs_and_outputs())
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>> line 530, in extract_bundle_inputs_and_outputs
>>>     data_api_service_descriptor = self.data_api_service_descriptor()
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>> line 451, in data_api_service_descriptor
>>>     return self.worker_handlers[0].data_api_service_descriptor()
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py",
>>> line 445, in worker_handlers
>>>     self.stage.environment, self.num_workers))
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>> line 854, in get_worker_handlers
>>>     worker_handler.start_worker()
>>>   File "/Users/piotr.filipiuk/src/
>>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>>> line 734, in start_worker
>>>     '--provision_endpoint=%s' % self.control_address,
>>>   File
>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>> line 336, in check_output
>>>     **kwargs).stdout
>>>   File
>>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py",
>>> line 418, in run
>>>     output=stdout, stderr=stderr)
>>> subprocess.CalledProcessError: Command '['docker', 'run', '-d',
>>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', '--id=worker_1',
>>> '--logging_endpoint=host.docker.internal:50453',
>>> '--control_endpoint=host.docker.internal:50450',
>>> '--artifact_endpoint=host.docker.internal:50450',
>>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero exit
>>> status 125.
>>>
>>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly available, I
>>> cannot pull the image. Is it possible to get access to dev images?
>>> Alternatively, are there any instructions on how to build the beam_java_sdk
>>> locally and then use the local image when running a job?
>>>
>>> Thanks
>>>
>>>
>>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <[email protected]>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I would like to know whether it is possible to run a streaming
>>>>> pipeline that reads from (or writes to) Kafka using DirectRunner? If so,
>>>>> what should the expansion_service point to:
>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90
>>>>> ?
>>>>
>>>>
>>>>> Also, when using FlinkRunner, what should be the value of
>>>>> expansion_service? Based on
>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
>>>>> Flink should start expansion service on port 8097. I am running Flink 
>>>>> using
>>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution
>>>>> and the expansion service is not being started. Same holds when using
>>>>> docker image: https://hub.docker.com/_/flink. I cannot find expansion
>>>>> service being mentioned in Flink Documentation - unless it is called by
>>>>> different name.
>>>>>
>>>>
>>>> After Beam 2.21.0 you should not have to specify anything. You only
>>>> need to make sure that Java is installed in the system (so that command
>>>> 'java' is available) and kafka.py will download the correct jar.
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62
>>>>
>>>> For HEAD (today), as long as you are in a Beam repo clone, you should
>>>> not have to specify anything as well since kafka.py will try to build the
>>>> dependency locally.
>>>>
>>>> For previous versions, you have to start up an expansion service
>>>> manually. I'm not familiar about the expansion service embedded in Flink
>>>> Job Server.
>>>>
>>>>
>>>>> Flink Version: flink-1.9.2
>>>>> Apache-beam Version: 2.19.0
>>>>>
>>>>> Full Details:
>>>>>
>>>>> The pipeline looks as follows:
>>>>>
>>>>> ```
>>>>> import argparse
>>>>> import logging
>>>>>
>>>>> import apache_beam as beam
>>>>> from apache_beam.io.external import kafka
>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>> from apache_beam.options.pipeline_options import SetupOptions
>>>>> from apache_beam.options.pipeline_options import StandardOptions
>>>>>
>>>>>
>>>>> def format_results(elem):
>>>>>     (msg, sum_value) = elem
>>>>>     print(type(msg))
>>>>>     print(type(sum_value))
>>>>>     return f"message: {msg}, sum: {sum_value}"
>>>>>
>>>>> def run(argv=None, save_main_session=True):
>>>>>     """Main entry point; defines and runs the pipeline."""
>>>>>     parser = argparse.ArgumentParser()
>>>>>
>>>>>     parser.add_argument('--bootstrap_servers', type=str,
>>>>>                         help='Kafka Broker address')
>>>>>     parser.add_argument('--topic', type=str, help='Kafka topic to read
>>>>> from')
>>>>>     parser.add_argument('--output', type=str,
>>>>> default="/tmp/kafka-output",
>>>>>                         help='Output filepath')
>>>>>
>>>>>     args, pipeline_args = parser.parse_known_args(argv)
>>>>>
>>>>>     if args.topic is None or args.bootstrap_servers is None:
>>>>>         parser.print_usage()
>>>>>         print(sys.argv[0] + ': error: both --topic and
>>>>> --bootstrap_servers are required')
>>>>>         sys.exit(1)
>>>>>
>>>>>     options = PipelineOptions(pipeline_args)
>>>>>     # We use the save_main_session option because one or more DoFn's
>>>>> in this
>>>>>     # workflow rely on global context (e.g., a module imported at
>>>>> module level).
>>>>>     options.view_as(SetupOptions).save_main_session = save_main_session
>>>>>
>>>>>     # Enforce that this pipeline is always run in streaming mode
>>>>>     options.view_as(StandardOptions).streaming = True
>>>>>
>>>>>     consumer_conf = {'bootstrap.servers': args.bootstrap_servers}
>>>>>
>>>>>     with beam.Pipeline(options=options) as p:
>>>>>         events = (
>>>>>             p
>>>>>             | "ReadFromKafka" >> kafka.ReadFromKafka(
>>>>>                 consumer_config=consumer_conf,
>>>>>                 topics=[args.topic],
>>>>>             )
>>>>>             | "WindowIntoFixedWindows" >> beam.WindowInto(
>>>>>                 beam.window.FixedWindows(60))
>>>>>             | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
>>>>>             | "SumByKey" >> beam.CombinePerKey(sum)
>>>>>             | 'FormatResults' >> beam.Map(format_results)
>>>>>             | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>         )
>>>>>
>>>>> if __name__ == '__main__':
>>>>>     logging.getLogger().setLevel(logging.DEBUG)
>>>>>     run()
>>>>> ```
>>>>>
>>>>> I run it using:
>>>>>
>>>>> ```
>>>>> python streaming.py --topic=inputs --bootstrap_servers==
>>>>> 192.168.1.219:32779
>>>>> ```
>>>>>
>>>>> Kafka brokers are available under the --bootstrap_servers and the
>>>>> --topic exists.
>>>>>
>>>>> The error I am getting:
>>>>>
>>>>> ```
>>>>> Traceback (most recent call last):
>>>>>   File "streaming.py", line 74, in <module>
>>>>>     run()
>>>>>   File "streaming.py", line 69, in run
>>>>>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>> line 989, in __ror__
>>>>>     return self.transform.__ror__(pvalueish, self.label)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>> line 549, in __ror__
>>>>>     result = p.apply(self, pvalueish, label)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 536, in apply
>>>>>     return self.apply(transform, pvalueish)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 577, in apply
>>>>>     pvalueish_result = self.runner.apply(transform, pvalueish,
>>>>> self._options)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>> line 195, in apply
>>>>>     return m(transform, input, options)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
>>>>> line 225, in apply_PTransform
>>>>>     return transform.expand(input)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
>>>>> line 327, in expand
>>>>>     channel).Expand(request)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>> line 826, in __call__
>>>>>     return _end_unary_response_blocking(state, call, False, None)
>>>>>   File
>>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
>>>>> line 729, in _end_unary_response_blocking
>>>>>     raise _InactiveRpcError(state)
>>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that
>>>>> terminated with:
>>>>>         status = StatusCode.UNAVAILABLE
>>>>>         details = "Trying to connect an http1.x server"
>>>>>         debug_error_string =
>>>>> "{"created":"@1587496938.983698000","description":"Error received from 
>>>>> peer
>>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
>>>>> to connect an http1.x server","grpc_status":14}"
>>>>> ```
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>

Reply via email to