You can build the Java SDK image from source by running the following command: ./gradlew :sdks:java:container:docker
On Wed, Apr 22, 2020 at 4:43 PM Piotr Filipiuk <[email protected]> wrote: > Thanks for quick response. > > Since Beam 2.21.0 is not yet available via pip > <https://pypi.org/project/apache-beam/#history>, I tried to run it from > HEAD. After creating fresh virtual environment, my steps were: > pip install -r build-requirements.txt > python setup.py build > python setup.py install > > Then I encounter the following error when running the script from the > original message: > > Error response from daemon: manifest for apache/beam_java_sdk:2.21.0.dev > not found: manifest unknown: manifest unknown > INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable > to pull image apache/beam_java_sdk:2.21.0.dev > Unable to find image 'apache/beam_java_sdk:2.21.0.dev' locally > docker: Error response from daemon: manifest for apache/beam_java_sdk: > 2.21.0.dev not found: manifest unknown: manifest unknown. > See 'docker run --help'. > DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: > Requests sent by runner: [] > DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner: > Requests multiplexing info: [] > Traceback (most recent call last): > File > "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line > 193, in _run_module_as_main > "__main__", mod_spec) > File > "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line > 85, in _run_code > exec(code, run_globals) > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", > line 74, in <module> > run() > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/examples/streaming_wordcount_kafka.py", > line 69, in run > | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 528, in > __exit__ > self.run().wait_until_finish() > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/pipeline.py", line 514, in > run > return self.runner.run_pipeline(self, self._options) > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", > line 130, in run_pipeline > return runner.run_pipeline(pipeline, options) > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", > line 173, in run_pipeline > pipeline.to_runner_api(default_environment=self._default_environment)) > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", > line 183, in run_via_runner_api > return self.run_stages(stage_context, stages) > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", > line 331, in run_stages > bundle_context_manager, > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", > line 470, in _run_stage > bundle_context_manager.extract_bundle_inputs_and_outputs()) > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", > line 530, in extract_bundle_inputs_and_outputs > data_api_service_descriptor = self.data_api_service_descriptor() > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", > line 451, in data_api_service_descriptor > return self.worker_handlers[0].data_api_service_descriptor() > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", > line 445, in worker_handlers > self.stage.environment, self.num_workers)) > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", > line 854, in get_worker_handlers > worker_handler.start_worker() > File "/Users/piotr.filipiuk/src/ > github.com/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", > line 734, in start_worker > '--provision_endpoint=%s' % self.control_address, > File > "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", > line 336, in check_output > **kwargs).stdout > File > "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/subprocess.py", > line 418, in run > output=stdout, stderr=stderr) > subprocess.CalledProcessError: Command '['docker', 'run', '-d', > '--network=host', 'apache/beam_java_sdk:2.21.0.dev', '--id=worker_1', > '--logging_endpoint=host.docker.internal:50453', > '--control_endpoint=host.docker.internal:50450', > '--artifact_endpoint=host.docker.internal:50450', > '--provision_endpoint=host.docker.internal:50450']' returned non-zero exit > status 125. > > Since the apache/beam_java_sdk:2.21.0.dev is not publicly available, I > cannot pull the image. Is it possible to get access to dev images? > Alternatively, are there any instructions on how to build the beam_java_sdk > locally and then use the local image when running a job? > > Thanks > > > On Tue, Apr 21, 2020 at 2:38 PM Chamikara Jayalath <[email protected]> > wrote: > >> >> >> On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <[email protected]> >> wrote: >> >>> Hi, >>> >>> I would like to know whether it is possible to run a streaming pipeline >>> that reads from (or writes to) Kafka using DirectRunner? If so, what should >>> the expansion_service point to: >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90 >>> ? >> >> >>> Also, when using FlinkRunner, what should be the value of >>> expansion_service? Based on >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24 >>> Flink should start expansion service on port 8097. I am running Flink using >>> https://beam.apache.org/documentation/runners/flink/#streaming-execution >>> and the expansion service is not being started. Same holds when using >>> docker image: https://hub.docker.com/_/flink. I cannot find expansion >>> service being mentioned in Flink Documentation - unless it is called by >>> different name. >>> >> >> After Beam 2.21.0 you should not have to specify anything. You only need >> to make sure that Java is installed in the system (so that command 'java' >> is available) and kafka.py will download the correct jar. >> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62 >> >> For HEAD (today), as long as you are in a Beam repo clone, you should not >> have to specify anything as well since kafka.py will try to build the >> dependency locally. >> >> For previous versions, you have to start up an expansion service >> manually. I'm not familiar about the expansion service embedded in Flink >> Job Server. >> >> >>> Flink Version: flink-1.9.2 >>> Apache-beam Version: 2.19.0 >>> >>> Full Details: >>> >>> The pipeline looks as follows: >>> >>> ``` >>> import argparse >>> import logging >>> >>> import apache_beam as beam >>> from apache_beam.io.external import kafka >>> from apache_beam.options.pipeline_options import PipelineOptions >>> from apache_beam.options.pipeline_options import SetupOptions >>> from apache_beam.options.pipeline_options import StandardOptions >>> >>> >>> def format_results(elem): >>> (msg, sum_value) = elem >>> print(type(msg)) >>> print(type(sum_value)) >>> return f"message: {msg}, sum: {sum_value}" >>> >>> def run(argv=None, save_main_session=True): >>> """Main entry point; defines and runs the pipeline.""" >>> parser = argparse.ArgumentParser() >>> >>> parser.add_argument('--bootstrap_servers', type=str, >>> help='Kafka Broker address') >>> parser.add_argument('--topic', type=str, help='Kafka topic to read >>> from') >>> parser.add_argument('--output', type=str, >>> default="/tmp/kafka-output", >>> help='Output filepath') >>> >>> args, pipeline_args = parser.parse_known_args(argv) >>> >>> if args.topic is None or args.bootstrap_servers is None: >>> parser.print_usage() >>> print(sys.argv[0] + ': error: both --topic and >>> --bootstrap_servers are required') >>> sys.exit(1) >>> >>> options = PipelineOptions(pipeline_args) >>> # We use the save_main_session option because one or more DoFn's in >>> this >>> # workflow rely on global context (e.g., a module imported at module >>> level). >>> options.view_as(SetupOptions).save_main_session = save_main_session >>> >>> # Enforce that this pipeline is always run in streaming mode >>> options.view_as(StandardOptions).streaming = True >>> >>> consumer_conf = {'bootstrap.servers': args.bootstrap_servers} >>> >>> with beam.Pipeline(options=options) as p: >>> events = ( >>> p >>> | "ReadFromKafka" >> kafka.ReadFromKafka( >>> consumer_config=consumer_conf, >>> topics=[args.topic], >>> ) >>> | "WindowIntoFixedWindows" >> beam.WindowInto( >>> beam.window.FixedWindows(60)) >>> | "AddOnes" >> beam.Map(lambda msg: (msg, 1)) >>> | "SumByKey" >> beam.CombinePerKey(sum) >>> | 'FormatResults' >> beam.Map(format_results) >>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>> ) >>> >>> if __name__ == '__main__': >>> logging.getLogger().setLevel(logging.DEBUG) >>> run() >>> ``` >>> >>> I run it using: >>> >>> ``` >>> python streaming.py --topic=inputs --bootstrap_servers== >>> 192.168.1.219:32779 >>> ``` >>> >>> Kafka brokers are available under the --bootstrap_servers and the >>> --topic exists. >>> >>> The error I am getting: >>> >>> ``` >>> Traceback (most recent call last): >>> File "streaming.py", line 74, in <module> >>> run() >>> File "streaming.py", line 69, in run >>> | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) >>> File >>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>> line 989, in __ror__ >>> return self.transform.__ror__(pvalueish, self.label) >>> File >>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", >>> line 549, in __ror__ >>> result = p.apply(self, pvalueish, label) >>> File >>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 536, in apply >>> return self.apply(transform, pvalueish) >>> File >>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 577, in apply >>> pvalueish_result = self.runner.apply(transform, pvalueish, >>> self._options) >>> File >>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>> line 195, in apply >>> return m(transform, input, options) >>> File >>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", >>> line 225, in apply_PTransform >>> return transform.expand(input) >>> File >>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py", >>> line 327, in expand >>> channel).Expand(request) >>> File >>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>> line 826, in __call__ >>> return _end_unary_response_blocking(state, call, False, None) >>> File >>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>> line 729, in _end_unary_response_blocking >>> raise _InactiveRpcError(state) >>> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that >>> terminated with: >>> status = StatusCode.UNAVAILABLE >>> details = "Trying to connect an http1.x server" >>> debug_error_string = >>> "{"created":"@1587496938.983698000","description":"Error received from peer >>> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying >>> to connect an http1.x server","grpc_status":14}" >>> ``` >>> >>> Thanks >>> >>> > > -- > Best regards, > Piotr >
