[ https://issues.apache.org/jira/browse/KAFKA-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma updated KAFKA-4086: ------------------------------- Labels: consumer reliability (was: consumer) > long processing consumer restart will stall > ------------------------------------------- > > Key: KAFKA-4086 > URL: https://issues.apache.org/jira/browse/KAFKA-4086 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.10.0.0 > Reporter: Dale Jin > Labels: consumer, reliability > > [~hachikuji] > We have a long processing consumer. Whenever a new consumer tries to join the > group while the long processing consumer is processing, the new consumer will > stall. > If we kill the long processing consumer and restart it again, it will stall > both consumers. > When we kill the long processing consumer, that consumer tries to issue a > leaveGroup command but it will fail seemingly due to the client request > timeout. > When we try to start the long processing consumer again, it seems to be > sending topic metadata to the broker then the subsequent join group request > is issued and returning a future but when I check the server log I don't see > the corresponding request in kafka-request.log. > When we construct the consumer, we use the following code (based on > kafka-python library): > {code} > self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, > value_deserializer=deserializer, > group_id=self.user_defined_sub_name, > heartbeat_interval_ms=10000, > session_timeout_ms=300000, > enable_auto_commit=False) > {code} > on the server side, we use 0.10.0.0 with default settings. > looks like a `RebalanceInProgressError` is thrown > {code} > 2016-08-22 20:39:08,984 - kafka.coordinator - INFO - Discovered coordinator > 100 for group v1.user.queue > 2016-08-22 20:39:08,984 - kafka.coordinator.consumer - INFO - Revoking > previously assigned partitions set() for group v1.user.queue > 2016-08-22 20:39:08,990 - kafka.cluster - DEBUG - Updated cluster metadata to > ClusterMetadata(brokers: 1, topics: 1, groups: 1) > 2016-08-22 20:39:08,990 - kafka.coordinator - INFO - (Re-)joining group > v1.user.queue > 2016-08-22 20:39:08,990 - kafka.coordinator - DEBUG - Sending JoinGroup > (JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, > member_id='', protocol_type='consumer', > group_protocols=[(protocol_name='range', > protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'), > (protocol_name='roundrobin', > protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])) > to coordinator 100 > 2016-08-22 20:39:08,991 - kafka.conn - DEBUG - <BrokerConnection > host=10.128.64.81/10.128.64.81 port=9092> Request 5: > JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, > member_id='', protocol_type='consumer', > group_protocols=[(protocol_name='range', > protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'), > (protocol_name='roundrobin', > protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')]) > 2016-08-22 20:43:04,576 - kafka.conn - WARNING - <BrokerConnection > host=10.128.64.81/10.128.64.81 port=9092> timed out after 40000 ms. Closing > connection. > 2016-08-22 20:43:04,576 - kafka.client - WARNING - Node 100 connection failed > – refreshing metadata > 2016-08-22 20:43:04,576 - kafka.coordinator - ERROR - Error sending > JoinGroupRequest_v0 to node 100 [Error 7 RequestTimedOutError: Request timed > out after 40000 ms] > 2016-08-22 20:43:04,576 - kafka.coordinator - WARNING - Marking the > coordinator dead (node 100) for group v1.user.queue: None. > 2016-08-22 20:43:04,678 - kafka.coordinator - DEBUG - Sending group > coordinator request for group v1.user.queue to broker 100 > {code} > fyi, we turned on the following in log4j: > {code} > log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender > log4j.additivity.kafka.server.KafkaApis=true > log4j.logger.kafka.request.logger=TRACE, requestAppender > log4j.additivity.kafka.request.logger=true > log4j.logger.kafka.controller=TRACE, controllerAppender > log4j.additivity.kafka.controller=true > log4j.logger.state.change.logger=TRACE, stateChangeAppender > log4j.additivity.state.change.logger=true > {code} > Please let us know how we can proceed forward to find out the root cause. -- This message was sent by Atlassian JIRA (v6.3.4#6332)