Is it possible that "'localhost:9092'" is not available from the Docker environment where the Flink step is executed from ? Can you try specifying the actual IP address of the node running the Kafka broker ?
On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <[email protected]> wrote: > +dev <[email protected]> +Chamikara Jayalath <[email protected]> > +Heejong > Lee <[email protected]> > > On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <[email protected]> > wrote: > >> I am unable to read from Kafka and getting the following warnings & >> errors when calling kafka.ReadFromKafka() (Python SDK): >> >> WARNING:root:severity: WARN >> timestamp { >> seconds: 1591370012 >> nanos: 523000000 >> } >> message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 >> could not be established. Broker may not be available." >> log_location: "org.apache.kafka.clients.NetworkClient" >> thread: "18" >> >> Finally the pipeline fails with: >> >> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >> java.lang.RuntimeException: >> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >> fetching topic metadata >> >> See more complete log attached. >> >> The relevant code snippet: >> >> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >> ... >> kafka.ReadFromKafka( >> consumer_config=consumer_conf, >> topics=[args.topic], >> ) >> ... >> >> Also see full python script attached. >> >> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am >> also not able to read from topic. >> >> I am using kafka 2.5.0 and started the broker by following >> https://kafka.apache.org/quickstart - using default >> config/server.properties. >> >> Everything runs locally, and I verified that I can publish&consume from >> that topic using confluent_kafka library. >> >> -- >> Best regards, >> Piotr >> >
