On Tue, Apr 21, 2020 at 12:43 PM Piotr Filipiuk <piotr.filip...@gmail.com>
wrote:

> Hi,
>
> I would like to know whether it is possible to run a streaming pipeline
> that reads from (or writes to) Kafka using DirectRunner? If so, what should
> the expansion_service point to:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90
> ?


> Also, when using FlinkRunner, what should be the value of
> expansion_service? Based on
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
> Flink should start expansion service on port 8097. I am running Flink using
> https://beam.apache.org/documentation/runners/flink/#streaming-execution
> and the expansion service is not being started. Same holds when using
> docker image: https://hub.docker.com/_/flink. I cannot find expansion
> service being mentioned in Flink Documentation - unless it is called by
> different name.
>

After Beam 2.21.0 you should not have to specify anything. You only need to
make sure that Java is installed in the system (so that command 'java' is
available) and kafka.py will download the correct jar.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L62

For HEAD (today), as long as you are in a Beam repo clone, you should not
have to specify anything as well since kafka.py will try to build the
dependency locally.

For previous versions, you have to start up an expansion service manually.
I'm not familiar about the expansion service embedded in Flink Job Server.


> Flink Version: flink-1.9.2
> Apache-beam Version: 2.19.0
>
> Full Details:
>
> The pipeline looks as follows:
>
> ```
> import argparse
> import logging
>
> import apache_beam as beam
> from apache_beam.io.external import kafka
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> from apache_beam.options.pipeline_options import StandardOptions
>
>
> def format_results(elem):
>     (msg, sum_value) = elem
>     print(type(msg))
>     print(type(sum_value))
>     return f"message: {msg}, sum: {sum_value}"
>
> def run(argv=None, save_main_session=True):
>     """Main entry point; defines and runs the pipeline."""
>     parser = argparse.ArgumentParser()
>
>     parser.add_argument('--bootstrap_servers', type=str,
>                         help='Kafka Broker address')
>     parser.add_argument('--topic', type=str, help='Kafka topic to read
> from')
>     parser.add_argument('--output', type=str, default="/tmp/kafka-output",
>                         help='Output filepath')
>
>     args, pipeline_args = parser.parse_known_args(argv)
>
>     if args.topic is None or args.bootstrap_servers is None:
>         parser.print_usage()
>         print(sys.argv[0] + ': error: both --topic and --bootstrap_servers
> are required')
>         sys.exit(1)
>
>     options = PipelineOptions(pipeline_args)
>     # We use the save_main_session option because one or more DoFn's in
> this
>     # workflow rely on global context (e.g., a module imported at module
> level).
>     options.view_as(SetupOptions).save_main_session = save_main_session
>
>     # Enforce that this pipeline is always run in streaming mode
>     options.view_as(StandardOptions).streaming = True
>
>     consumer_conf = {'bootstrap.servers': args.bootstrap_servers}
>
>     with beam.Pipeline(options=options) as p:
>         events = (
>             p
>             | "ReadFromKafka" >> kafka.ReadFromKafka(
>                 consumer_config=consumer_conf,
>                 topics=[args.topic],
>             )
>             | "WindowIntoFixedWindows" >> beam.WindowInto(
>                 beam.window.FixedWindows(60))
>             | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
>             | "SumByKey" >> beam.CombinePerKey(sum)
>             | 'FormatResults' >> beam.Map(format_results)
>             | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>         )
>
> if __name__ == '__main__':
>     logging.getLogger().setLevel(logging.DEBUG)
>     run()
> ```
>
> I run it using:
>
> ```
> python streaming.py --topic=inputs --bootstrap_servers==
> 192.168.1.219:32779
> ```
>
> Kafka brokers are available under the --bootstrap_servers and the --topic
> exists.
>
> The error I am getting:
>
> ```
> Traceback (most recent call last):
>   File "streaming.py", line 74, in <module>
>     run()
>   File "streaming.py", line 69, in run
>     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
>   File
> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
> line 989, in __ror__
>     return self.transform.__ror__(pvalueish, self.label)
>   File
> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
> line 549, in __ror__
>     result = p.apply(self, pvalueish, label)
>   File
> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 536, in apply
>     return self.apply(transform, pvalueish)
>   File
> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 577, in apply
>     pvalueish_result = self.runner.apply(transform, pvalueish,
> self._options)
>   File
> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
> line 195, in apply
>     return m(transform, input, options)
>   File
> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
> line 225, in apply_PTransform
>     return transform.expand(input)
>   File
> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
> line 327, in expand
>     channel).Expand(request)
>   File
> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
> line 826, in __call__
>     return _end_unary_response_blocking(state, call, False, None)
>   File
> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
> line 729, in _end_unary_response_blocking
>     raise _InactiveRpcError(state)
> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated
> with:
>         status = StatusCode.UNAVAILABLE
>         details = "Trying to connect an http1.x server"
>         debug_error_string =
> "{"created":"@1587496938.983698000","description":"Error received from peer
> ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
> to connect an http1.x server","grpc_status":14}"
> ```
>
> Thanks
>
>

Reply via email to