Hi, I would like to know whether it is possible to run a streaming pipeline that reads from (or writes to) Kafka using DirectRunner? If so, what should the expansion_service point to: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90?
Also, when using FlinkRunner, what should be the value of expansion_service? Based on https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24 Flink should start expansion service on port 8097. I am running Flink using https://beam.apache.org/documentation/runners/flink/#streaming-execution and the expansion service is not being started. Same holds when using docker image: https://hub.docker.com/_/flink. I cannot find expansion service being mentioned in Flink Documentation - unless it is called by different name. Flink Version: flink-1.9.2 Apache-beam Version: 2.19.0 Full Details: The pipeline looks as follows: ``` import argparse import logging import apache_beam as beam from apache_beam.io.external import kafka from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions def format_results(elem): (msg, sum_value) = elem print(type(msg)) print(type(sum_value)) return f"message: {msg}, sum: {sum_value}" def run(argv=None, save_main_session=True): """Main entry point; defines and runs the pipeline.""" parser = argparse.ArgumentParser() parser.add_argument('--bootstrap_servers', type=str, help='Kafka Broker address') parser.add_argument('--topic', type=str, help='Kafka topic to read from') parser.add_argument('--output', type=str, default="/tmp/kafka-output", help='Output filepath') args, pipeline_args = parser.parse_known_args(argv) if args.topic is None or args.bootstrap_servers is None: parser.print_usage() print(sys.argv[0] + ': error: both --topic and --bootstrap_servers are required') sys.exit(1) options = PipelineOptions(pipeline_args) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level). options.view_as(SetupOptions).save_main_session = save_main_session # Enforce that this pipeline is always run in streaming mode options.view_as(StandardOptions).streaming = True consumer_conf = {'bootstrap.servers': args.bootstrap_servers} with beam.Pipeline(options=options) as p: events = ( p | "ReadFromKafka" >> kafka.ReadFromKafka( consumer_config=consumer_conf, topics=[args.topic], ) | "WindowIntoFixedWindows" >> beam.WindowInto( beam.window.FixedWindows(60)) | "AddOnes" >> beam.Map(lambda msg: (msg, 1)) | "SumByKey" >> beam.CombinePerKey(sum) | 'FormatResults' >> beam.Map(format_results) | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) ) if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) run() ``` I run it using: ``` python streaming.py --topic=inputs --bootstrap_servers==192.168.1.219:32779 ``` Kafka brokers are available under the --bootstrap_servers and the --topic exists. The error I am getting: ``` Traceback (most recent call last): File "streaming.py", line 74, in <module> run() File "streaming.py", line 69, in run | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output) File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 989, in __ror__ return self.transform.__ror__(pvalueish, self.label) File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 549, in __ror__ result = p.apply(self, pvalueish, label) File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", line 536, in apply return self.apply(transform, pvalueish) File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", line 577, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 195, in apply return m(transform, input, options) File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 225, in apply_PTransform return transform.expand(input) File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py", line 327, in expand channel).Expand(request) File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", line 826, in __call__ return _end_unary_response_blocking(state, call, False, None) File "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking raise _InactiveRpcError(state) grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Trying to connect an http1.x server" debug_error_string = "{"created":"@1587496938.983698000","description":"Error received from peer ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying to connect an http1.x server","grpc_status":14}" ``` Thanks
