Thx. Once the docker image is build, how do I make sure it is used when I run Beam pipeline, as opposed to pulling from docker hub?
On Wed, Apr 22, 2020 at 1:47 PM Kyle Weaver <[email protected]> wrote: > You can build the Java SDK image from source by running the following > command: ./gradlew :sdks:java:container:docker > > On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <[email protected]> > wrote: > >> Thanks for quick response. >> >> Since Beam 2.21.0 is not yet available via pip >> <https://pypi.org/project/apache-beam/#history>, I tried to run it from >> HEAD. After creating fresh virtual environment, my steps were: >> pip install -r build-requirements.txt >> python setup.py build >> python setup.py install >> >> Then I encounter the following error when running the script from the >> original message: >> >> Error response from daemon: manifest for apache/beam_java_sdk:2.21.0.dev >> not found: manifest unknown: manifest unknown >> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable >> to pull image apache/beam_java_sdk:2.21.0.dev >> Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally >> docker: Error response from daemon: manifest for apache/beam_java_sdk: >> 2.21.0.dev not found: manifest unknown: manifest unknown. >> See 'docker run --help'. >> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >> Requests sent by runner: [] >> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: >> Requests multiplexing info: [] >> Traceback (most recent call last): >> File >> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line >> 193, in _run_module_as_main >> "__main__", mod_spec) >> File >> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line >> 85, in _run_code >> exec(code, run_globals) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >> line 74, in <module> >> run() >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", >> line 69, in run >> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, >> in __exit__ >> self.run().wait_until_finish() >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, >> in run >> return self.runner.run_pipeline(self, self._options) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", >> line 130, in run_pipeline >> return runner.run_pipeline(pipeline, options) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 173, in run_pipeline >> pipeline.to_runner_api(default_environment=self._default_environment)) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 183, in run_via_runner_api >> return self.run_stages(stage_context, stages) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 331, in run_stages >> bundle_context_manager, >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", >> line 470, in _run_stage >> bundle_context_manager.extract_bundle_inputs_and_outputs()) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >> line 530, in extract_bundle_inputs_and_outputs >> data_api_service_descriptor = self.data_api_service_descriptor() >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >> line 451, in data_api_service_descriptor >> return self.worker_handlers[0].data_api_service_descriptor() >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", >> line 445, in worker_handlers >> self.stage.environment, self.num_workers)) >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", >> line 854, in get_worker_handlers >> worker_handler.start_worker() >> File "/Users/piotr.filipiuk/src/ >> github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", >> line 734, in start_worker >> '--provision_endpoint=%s' % self.control_address, >> File >> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", >> line 336, in check_output >> **kwargs).stdout >> File >> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", >> line 418, in run >> output=stdout, stderr=stderr) >> subprocess.CalledProcessError: Command '['docker', 'run', '-d', >> '--network=host', 'apache/beam_java_sdk:2.21.0.dev', '--id=worker_1', >> '--logging_endpoint=host.docker.internal:50453', >> '--control_endpoint=host.docker.internal:50450', >> '--artifact_endpoint=host.docker.internal:50450', >> '--provision_endpoint=host.docker.internal:50450']' returned non-zero exit >> status 125. >> >> Since the apache/beam_java_sdk:2.21.0.dev is not publicly available, I >> cannot pull the image. Is it possible to get access to dev images? >> Alternatively, are there any instructions on how to build the beam_java_sdk >> locally and then use the local image when running a job? >> >> Thanks >> >> >> On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> >>> >>> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk < >>> [email protected]> wrote: >>> >>>> Hi, >>>> >>>> I would like to know whether it is possible to run a streaming pipeline >>>> that reads from (or writes to) Kafka using DirectRunner? If so, what should >>>> the expansion_service point to: >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90 >>>> ? >>> >>> >>>> Also, when using FlinkRunner, what should be the value of >>>> expansion_service? Based on >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24 >>>> Flink should start expansion service on port 8097. I am running Flink using >>>> https://beam.apache.org/documentation/runners/flink/#streaming-execution >>>> and the expansion service is not being started. Same holds when using >>>> docker image: https://hub.docker.com/_/flink. I cannot find expansion >>>> service being mentioned in Flink Documentation - unless it is called by >>>> different name. >>>> >>> >>> After Beam 2.21.0 you should not have to specify anything. You only need >>> to make sure that Java is installed in the system (so that command 'java' >>> is available) and kafka.py will download the correct jar. >>> >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62 >>> >>> For HEAD (today), as long as you are in a Beam repo clone, you should >>> not have to specify anything as well since kafka.py will try to build the >>> dependency locally. >>> >>> For previous versions, you have to start up an expansion service >>> manually. I'm not familiar about the expansion service embedded in Flink >>> Job Server. >>> >>> >>>> Flink Version: flink-1.9.2 >>>> Apache-beam Version: 2.19.0 >>>> >>>> Full Details: >>>> >>>> The pipeline looks as follows: >>>> >>>> ``` >>>> import argparse >>>> import logging >>>> >>>> import apache_beam as beam >>>> from apache_beam.io.external import kafka >>>> from apache_beam.options.pipeline_options import PipelineOptions >>>> from apache_beam.options.pipeline_options import SetupOptions >>>> from apache_beam.options.pipeline_options import StandardOptions >>>> >>>> >>>> def format_results(elem): >>>> (msg, sum_value) = elem >>>> print(type(msg)) >>>> print(type(sum_value)) >>>> return f"message: {msg}, sum: {sum_value}" >>>> >>>> def run(argv=None, save_main_session=True): >>>> """Main entry point; defines and runs the pipeline.""" >>>> parser = argparse.ArgumentParser() >>>> >>>> parser.add_argument('--bootstrap_servers', type=str, >>>> help='Kafka Broker address') >>>> parser.add_argument('--topic', type=str, help='Kafka topic to read >>>> from') >>>> parser.add_argument('--output', type=str, >>>> default="/tmp/kafka-output", >>>> help='Output filepath') >>>> >>>> args, pipeline_args = parser.parse_known_args(argv) >>>> >>>> if args.topic is None or args.bootstrap_servers is None: >>>> parser.print_usage() >>>> print(sys.argv[0] + ': error: both --topic and >>>> --bootstrap_servers are required') >>>> sys.exit(1) >>>> >>>> options = PipelineOptions(pipeline_args) >>>> # We use the save_main_session option because one or more DoFn's in >>>> this >>>> # workflow rely on global context (e.g., a module imported at >>>> module level). >>>> options.view_as(SetupOptions).save_main_session = save_main_session >>>> >>>> # Enforce that this pipeline is always run in streaming mode >>>> options.view_as(StandardOptions).streaming = True >>>> >>>> consumer_conf = {'bootstrap.servers': args.bootstrap_servers} >>>> >>>> with beam.Pipeline(options=options) as p: >>>> events = ( >>>> p >>>> | "ReadFromKafka" >> kafka.ReadFromKafka( >>>> consumer_config=consumer_conf, >>>> topics=[args.topic], >>>> ) >>>> | "WindowIntoFixedWindows" >> beam.WindowInto( >>>> beam.window.FixedWindows(60)) >>>> | "AddOnes" >> beam.Map(lambda msg: (msg, 1)) >>>> | "SumByKey" >> beam.CombinePerKey(sum) >>>> | 'FormatResults' >> beam.Map(format_results) >>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>> ) >>>> >>>> if __name__ == '__main__': >>>> logging.getLogger().setLevel(logging.DEBUG) >>>> run() >>>> ``` >>>> >>>> I run it using: >>>> >>>> ``` >>>> python streaming.py --topic=inputs --bootstrap_servers== >>>> 192.168.1.219:32779 >>>> ``` >>>> >>>> Kafka brokers are available under the --bootstrap_servers and the >>>> --topic exists. >>>> >>>> The error I am getting: >>>> >>>> ``` >>>> Traceback (most recent call last): >>>> File "streaming.py", line 74, in <module> >>>> run() >>>> File "streaming.py", line 69, in run >>>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>> line 989, in __ror__ >>>> return self.transform.__ror__(pvalueish, self.label) >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>>> line 549, in __ror__ >>>> result = p.apply(self, pvalueish, label) >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>> line 536, in apply >>>> return self.apply(transform, pvalueish) >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>> line 577, in apply >>>> pvalueish_result = self.runner.apply(transform, pvalueish, >>>> self._options) >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>>> line 195, in apply >>>> return m(transform, input, options) >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>>> line 225, in apply_PTransform >>>> return transform.expand(input) >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py", >>>> line 327, in expand >>>> channel).Expand(request) >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>> line 826, in __call__ >>>> return _end_unary_response_blocking(state, call, False, None) >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>> line 729, in _end_unary_response_blocking >>>> raise _InactiveRpcError(state) >>>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that >>>> terminated with: >>>> status = StatusCode.UNAVAILABLE >>>> details = "Trying to connect an http1.x server" >>>> debug_error_string = >>>> "{"created":"@1587496938.983698000","description":"Error received from peer >>>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying >>>> to connect an http1.x server","grpc_status":14}" >>>> ``` >>>> >>>> Thanks >>>> >>>> >> >> -- >> Best regards, >> Piotr >> > -- Best regards, Piotr
