Hi Rajib, We can't see the args you're passing the consumer, and the error message indicates the consumer can't find the cluster.
Thanks, Liam Clarke-Hutchinson On Fri, 8 May 2020, 3:04 pm Rajib Deb, <rajib_...@infosys.com> wrote: > I wanted to check if anyone has faced this issue > > Thanks > Rajib > > From: Rajib Deb > Sent: Sunday, May 3, 2020 9:51 AM > To: users@kafka.apache.org > Subject: Kafka - FindCoordinator error > > Hi > I have written a Python consumer using confluent-kafka package. After few > hours of running the consumer is dying with the below error > > cimpl.KafkaException: > KafkaError{code=_TIMED_OUT,val=-185,str="FindCoordinator response > error: Local: Timed out"} > > Can anyone please help me understand why this is happening > ** > Below is a portion of the code > ** > producer_conf = { > 'bootstrap.servers': 'xxxxxxx', > 'security.protocol': 'SASL_SSL', > 'sasl.mechanisms': 'PLAIN', > 'sasl.username': 'xxxxx', > 'sasl.password': 'xxxx', > 'ssl.ca.location':'xxxx', > 'ssl.certificate.location': 'xxxx', > 'queue.buffering.max.messages': 100000, > 'queue.buffering.max.ms' : 1000, > 'batch.num.messages': 500 > } > > p = Producer(**producer_conf) > target_topic='xxxxxx' > > c = Consumer(kwargs) > source_topic='xxxx' > c.subscribe([source_topic]) > while True: > > msg = c.poll(100) #I am consuming from a topic > > if msg is None: > continue > if msg.error(): > logging.error("error occurred during polling topic") > logging.error(msg.error()) > raise KafkaException(msg.error()) > continue > > #logging.info("input msg form topic: ") > #logging.info(msg.value()) > #msgDict = json.loads(msg.value()) # taking msg into dictionary > try: > p.produce(target_topic, msg.value(), callback=delivery_callback) > #the message from the consumed topic is pushed to the target topic > c.commit() #disabled auto commit, manually committing only when > message pushed to the target topic > except BufferError: > sys.stderr.write('%% Local producer queue is full (%d messages > awaiting delivery): try again\n' % > len(p)) > except Exception as e: > print(e) > > p.poll(0) > #sys.stderr.write('%% Waiting for %d deliveries\n' % len(p)) > p.flush() > > Thanks > Rajib >