It should just work without any other changes. If it doesn't let us know. On Wed, Apr 22, 2020 at 5:18 PM Piotr Filipiuk <[email protected]> wrote:
> Thx. Once the docker image is build, how do I make sure it is used when I > run Beam pipeline, as opposed to pulling from docker hub? > > On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]> wrote: > >> You can build the Java SDK image from source by running the following >> command: ./gradlew :sdks:java:container:docker >> >> On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <[email protected]> >> wrote: >> >>> Thanks for quick response. >>> >>> Since Beam 2.21.0 is not yet available via pip >>> <https://pypi.org/project/apache-beam/#history>, I tried to run it from >>> HEAD. After creating fresh virtual environment, my steps were: >>> pip install -r build-requirements.txt >>> python setup.py build >>> python setup.py install >>> >>> Then I encounter the following error when running the script from the >>> original message: >>> >>> Error response from daemon: manifest for apache/beam_java_sdk:2.21.0.dev >>> not found: manifest unknown: manifest unknown >>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable >>> to pull image apache/beam_java_sdk:2.21.0.dev >>> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally >>> docker: Error response from daemon: manifest for apache/beam_java_sdk: >>> 2.21.0.dev not found: manifest unknown: manifest unknown. >>> See 'docker run --help'. >>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>> Requests sent by runner: [] >>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >>> Requests multiplexing info: [] >>> Traceback (most recent call last): >>> File >>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line >>> 193, in _run_module_as_main >>> "__main__", mod_spec) >>> File >>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line >>> 85, in _run_code >>> exec(code, run_globals) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>> line 74, in <module> >>> run() >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >>> line 69, in run >>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, >>> in __exit__ >>> self.run().wait_until_finish() >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, >>> in run >>> return self.runner.run_pipeline(self, self._options) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", >>> line 130, in run_pipeline >>> return runner.run_pipeline(pipeline, options) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 173, in run_pipeline >>> >>> pipeline.to_runner_api(default_environment=self._default_environment)) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 183, in run_via_runner_api >>> return self.run_stages(stage_context, stages) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 331, in run_stages >>> bundle_context_manager, >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >>> line 470, in _run_stage >>> bundle_context_manager.extract_bundle_inputs_and_outputs()) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>> line 530, in extract_bundle_inputs_and_outputs >>> data_api_service_descriptor = self.data_api_service_descriptor() >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>> line 451, in data_api_service_descriptor >>> return self.worker_handlers[0].data_api_service_descriptor() >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >>> line 445, in worker_handlers >>> self.stage.environment, self.num_workers)) >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", >>> line 854, in get_worker_handlers >>> worker_handler.start_worker() >>> File "/Users/piotr.filipiuk/src/ >>> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", >>> line 734, in start_worker >>> '--provision_endpoint=%s' % self.control_address, >>> File >>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", >>> line 336, in check_output >>> **kwargs).stdout >>> File >>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", >>> line 418, in run >>> output=stdout, stderr=stderr) >>> subprocess.CalledProcessError: Command '['docker', 'run', '-d', >>> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', '--id=worker_1', >>> '--logging_endpoint=host.docker.internal:50453', >>> '--control_endpoint=host.docker.internal:50450', >>> '--artifact_endpoint=host.docker.internal:50450', >>> '--provision_endpoint=host.docker.internal:50450']' returned non-zero exit >>> status 125. >>> >>> Since the apache/beam_java_sdk:2.21.0.dev is not publicly available, I >>> cannot pull the image. Is it possible to get access to dev images? >>> Alternatively, are there any instructions on how to build the beam_java_sdk >>> locally and then use the local image when running a job? >>> >>> Thanks >>> >>> >>> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <[email protected]> >>> wrote: >>> >>>> >>>> >>>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk < >>>> [email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> I would like to know whether it is possible to run a streaming >>>>> pipeline that reads from (or writes to) Kafka using DirectRunner? If so, >>>>> what should the expansion_service point to: >>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90 >>>>> ? >>>> >>>> >>>>> Also, when using FlinkRunner, what should be the value of >>>>> expansion_service? Based on >>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24 >>>>> Flink should start expansion service on port 8097. I am running Flink >>>>> using >>>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution >>>>> and the expansion service is not being started. Same holds when using >>>>> docker image: https://hub.docker.com/_/flink. I cannot find expansion >>>>> service being mentioned in Flink Documentation - unless it is called by >>>>> different name. >>>>> >>>> >>>> After Beam 2.21.0 you should not have to specify anything. You only >>>> need to make sure that Java is installed in the system (so that command >>>> 'java' is available) and kafka.py will download the correct jar. >>>> >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62 >>>> >>>> For HEAD (today), as long as you are in a Beam repo clone, you should >>>> not have to specify anything as well since kafka.py will try to build the >>>> dependency locally. >>>> >>>> For previous versions, you have to start up an expansion service >>>> manually. I'm not familiar about the expansion service embedded in Flink >>>> Job Server. >>>> >>>> >>>>> Flink Version: flink-1.9.2 >>>>> Apache-beam Version: 2.19.0 >>>>> >>>>> Full Details: >>>>> >>>>> The pipeline looks as follows: >>>>> >>>>> ``` >>>>> import argparse >>>>> import logging >>>>> >>>>> import apache_beam as beam >>>>> from apache_beam.io.external import kafka >>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>> from apache_beam.options.pipeline_options import SetupOptions >>>>> from apache_beam.options.pipeline_options import StandardOptions >>>>> >>>>> >>>>> def format_results(elem): >>>>> (msg, sum_value) = elem >>>>> print(type(msg)) >>>>> print(type(sum_value)) >>>>> return f"message: {msg}, sum: {sum_value}" >>>>> >>>>> def run(argv=None, save_main_session=True): >>>>> """Main entry point; defines and runs the pipeline.""" >>>>> parser = argparse.ArgumentParser() >>>>> >>>>> parser.add_argument('--bootstrap_servers', type=str, >>>>> help='Kafka Broker address') >>>>> parser.add_argument('--topic', type=str, help='Kafka topic to read >>>>> from') >>>>> parser.add_argument('--output', type=str, >>>>> default="/tmp/kafka-output", >>>>> help='Output filepath') >>>>> >>>>> args, pipeline_args = parser.parse_known_args(argv) >>>>> >>>>> if args.topic is None or args.bootstrap_servers is None: >>>>> parser.print_usage() >>>>> print(sys.argv[0] + ': error: both --topic and >>>>> --bootstrap_servers are required') >>>>> sys.exit(1) >>>>> >>>>> options = PipelineOptions(pipeline_args) >>>>> # We use the save_main_session option because one or more DoFn's >>>>> in this >>>>> # workflow rely on global context (e.g., a module imported at >>>>> module level). >>>>> options.view_as(SetupOptions).save_main_session = save_main_session >>>>> >>>>> # Enforce that this pipeline is always run in streaming mode >>>>> options.view_as(StandardOptions).streaming = True >>>>> >>>>> consumer_conf = {'bootstrap.servers': args.bootstrap_servers} >>>>> >>>>> with beam.Pipeline(options=options) as p: >>>>> events = ( >>>>> p >>>>> | "ReadFromKafka" >> kafka.ReadFromKafka( >>>>> consumer_config=consumer_conf, >>>>> topics=[args.topic], >>>>> ) >>>>> | "WindowIntoFixedWindows" >> beam.WindowInto( >>>>> beam.window.FixedWindows(60)) >>>>> | "AddOnes" >> beam.Map(lambda msg: (msg, 1)) >>>>> | "SumByKey" >> beam.CombinePerKey(sum) >>>>> | 'FormatResults' >> beam.Map(format_results) >>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>>> ) >>>>> >>>>> if __name__ == '__main__': >>>>> logging.getLogger().setLevel(logging.DEBUG) >>>>> run() >>>>> ``` >>>>> >>>>> I run it using: >>>>> >>>>> ``` >>>>> python streaming.py --topic=inputs --bootstrap_servers== >>>>> 192.168.1.219:32779 >>>>> ``` >>>>> >>>>> Kafka brokers are available under the --bootstrap_servers and the >>>>> --topic exists. >>>>> >>>>> The error I am getting: >>>>> >>>>> ``` >>>>> Traceback (most recent call last): >>>>> File "streaming.py", line 74, in <module> >>>>> run() >>>>> File "streaming.py", line 69, in run >>>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>>> File >>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>>> line 989, in __ror__ >>>>> return self.transform.__ror__(pvalueish, self.label) >>>>> File >>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>>> line 549, in __ror__ >>>>> result = p.apply(self, pvalueish, label) >>>>> File >>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>> line 536, in apply >>>>> return self.apply(transform, pvalueish) >>>>> File >>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>>> line 577, in apply >>>>> pvalueish_result = self.runner.apply(transform, pvalueish, >>>>> self._options) >>>>> File >>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>>>> line 195, in apply >>>>> return m(transform, input, options) >>>>> File >>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>>>> line 225, in apply_PTransform >>>>> return transform.expand(input) >>>>> File >>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py", >>>>> line 327, in expand >>>>> channel).Expand(request) >>>>> File >>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>>> line 826, in __call__ >>>>> return _end_unary_response_blocking(state, call, False, None) >>>>> File >>>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>>> line 729, in _end_unary_response_blocking >>>>> raise _InactiveRpcError(state) >>>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that >>>>> terminated with: >>>>> status = StatusCode.UNAVAILABLE >>>>> details = "Trying to connect an http1.x server" >>>>> debug_error_string = >>>>> "{"created":"@1587496938.983698000","description":"Error received from >>>>> peer >>>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying >>>>> to connect an http1.x server","grpc_status":14}" >>>>> ``` >>>>> >>>>> Thanks >>>>> >>>>> >>> >>> -- >>> Best regards, >>> Piotr >>> >> > > -- > Best regards, > Piotr >
