Hi,

I would like to know whether it is possible to run a streaming pipeline that 
reads from (or writes to) Kafka using DirectRunner? If so, what should the 
expansion_service point to: 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L90?

Also, when using FlinkRunner, what should be the value of expansion_service? 
Based on 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L24
 Flink should start expansion service on port 8097. I am running Flink using 
https://beam.apache.org/documentation/runners/flink/#streaming-execution and 
the expansion service is not being started. Same holds when using docker image: 
https://hub.docker.com/_/flink. I cannot find expansion service being mentioned 
in Flink Documentation - unless it is called by different name.

Flink Version: flink-1.9.2
Apache-beam Version: 2.19.0

Full Details:

The pipeline looks as follows:

```
import argparse
import logging

import apache_beam as beam
from apache_beam.io.external import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions


def format_results(elem):
    (msg, sum_value) = elem
    print(type(msg))
    print(type(sum_value))
    return f"message: {msg}, sum: {sum_value}"

def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the pipeline."""
    parser = argparse.ArgumentParser()

    parser.add_argument('--bootstrap_servers', type=str,
                        help='Kafka Broker address')
    parser.add_argument('--topic', type=str, help='Kafka topic to read from')
    parser.add_argument('--output', type=str, default="/tmp/kafka-output",
                        help='Output filepath')

    args, pipeline_args = parser.parse_known_args(argv)

    if args.topic is None or args.bootstrap_servers is None:
        parser.print_usage()
        print(sys.argv[0] + ': error: both --topic and --bootstrap_servers are 
required')
        sys.exit(1)

    options = PipelineOptions(pipeline_args)
    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    options.view_as(SetupOptions).save_main_session = save_main_session

    # Enforce that this pipeline is always run in streaming mode
    options.view_as(StandardOptions).streaming = True

    consumer_conf = {'bootstrap.servers': args.bootstrap_servers}

    with beam.Pipeline(options=options) as p:
        events = (
            p
            | "ReadFromKafka" >> kafka.ReadFromKafka(
                consumer_config=consumer_conf,
                topics=[args.topic],
            )
            | "WindowIntoFixedWindows" >> beam.WindowInto(
                beam.window.FixedWindows(60))
            | "AddOnes" >> beam.Map(lambda msg: (msg, 1))
            | "SumByKey" >> beam.CombinePerKey(sum)
            | 'FormatResults' >> beam.Map(format_results)
            | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
        )

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.DEBUG)
    run()
```

I run it using:

```
python streaming.py --topic=inputs --bootstrap_servers==192.168.1.219:32779
```

Kafka brokers are available under the --bootstrap_servers and the --topic 
exists.

The error I am getting:

```
Traceback (most recent call last):
  File "streaming.py", line 74, in <module>
    run()
  File "streaming.py", line 69, in run
    | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output)
  File 
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
 line 989, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File 
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
 line 549, in __ror__
    result = p.apply(self, pvalueish, label)
  File 
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
 line 536, in apply
    return self.apply(transform, pvalueish)
  File 
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
 line 577, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File 
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
 line 195, in apply
    return m(transform, input, options)
  File 
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/runner.py",
 line 225, in apply_PTransform
    return transform.expand(input)
  File 
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/transforms/external.py",
 line 327, in expand
    channel).Expand(request)
  File 
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
 line 826, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File 
"/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py",
 line 729, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Trying to connect an http1.x server"
        debug_error_string = 
"{"created":"@1587496938.983698000","description":"Error received from peer 
ipv6:[::1]:8081","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Trying
 to connect an http1.x server","grpc_status":14}"
```

Thanks

Reply via email to