[ 
https://issues.apache.org/jira/browse/KAFKA-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4086:
-------------------------------
    Labels: consumer reliability  (was: consumer)

> long processing consumer restart will stall
> -------------------------------------------
>
>                 Key: KAFKA-4086
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4086
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.0.0
>            Reporter: Dale Jin
>              Labels: consumer, reliability
>
> [~hachikuji]
> We have a long processing consumer. Whenever a new consumer tries to join the 
> group while the long processing consumer is processing, the new consumer will 
> stall.
> If we kill the long processing consumer and restart it again, it will stall 
> both consumers.
> When we kill the long processing consumer, that consumer tries to issue a 
> leaveGroup command but it will fail seemingly due to the client request 
> timeout.
> When we try to start the long processing consumer again, it seems to be 
> sending topic metadata to the broker then the subsequent join group request 
> is issued and returning a future but when I check the server log I don't see 
> the corresponding request in kafka-request.log.
> When we construct the consumer, we use the following code (based on 
> kafka-python library):
> {code}
>         self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
>                                       value_deserializer=deserializer,
>                                       group_id=self.user_defined_sub_name,
>                                       heartbeat_interval_ms=10000,
>                                       session_timeout_ms=300000,
>                                       enable_auto_commit=False)
> {code}
> on the server side, we use 0.10.0.0 with default settings.
> looks like a `RebalanceInProgressError` is thrown
> {code}
> 2016-08-22 20:39:08,984 - kafka.coordinator - INFO - Discovered coordinator 
> 100 for group v1.user.queue
> 2016-08-22 20:39:08,984 - kafka.coordinator.consumer - INFO - Revoking 
> previously assigned partitions set() for group v1.user.queue
> 2016-08-22 20:39:08,990 - kafka.cluster - DEBUG - Updated cluster metadata to 
> ClusterMetadata(brokers: 1, topics: 1, groups: 1)
> 2016-08-22 20:39:08,990 - kafka.coordinator - INFO - (Re-)joining group 
> v1.user.queue
> 2016-08-22 20:39:08,990 - kafka.coordinator - DEBUG - Sending JoinGroup 
> (JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, 
> member_id='', protocol_type='consumer', 
> group_protocols=[(protocol_name='range', 
> protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'),
>  (protocol_name='roundrobin', 
> protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')]))
>  to coordinator 100
> 2016-08-22 20:39:08,991 - kafka.conn - DEBUG - <BrokerConnection 
> host=10.128.64.81/10.128.64.81 port=9092> Request 5: 
> JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, 
> member_id='', protocol_type='consumer', 
> group_protocols=[(protocol_name='range', 
> protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'),
>  (protocol_name='roundrobin', 
> protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])
> 2016-08-22 20:43:04,576 - kafka.conn - WARNING - <BrokerConnection 
> host=10.128.64.81/10.128.64.81 port=9092> timed out after 40000 ms. Closing 
> connection.
> 2016-08-22 20:43:04,576 - kafka.client - WARNING - Node 100 connection failed 
> – refreshing metadata
> 2016-08-22 20:43:04,576 - kafka.coordinator - ERROR - Error sending 
> JoinGroupRequest_v0 to node 100 [Error 7 RequestTimedOutError: Request timed 
> out after 40000 ms]
> 2016-08-22 20:43:04,576 - kafka.coordinator - WARNING - Marking the 
> coordinator dead (node 100) for group v1.user.queue: None.
> 2016-08-22 20:43:04,678 - kafka.coordinator - DEBUG - Sending group 
> coordinator request for group v1.user.queue to broker 100
> {code}
> fyi, we turned on the following in log4j:
> {code}
> log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
> log4j.additivity.kafka.server.KafkaApis=true
> log4j.logger.kafka.request.logger=TRACE, requestAppender
> log4j.additivity.kafka.request.logger=true
> log4j.logger.kafka.controller=TRACE, controllerAppender
> log4j.additivity.kafka.controller=true
> log4j.logger.state.change.logger=TRACE, stateChangeAppender
> log4j.additivity.state.change.logger=true
> {code}
> Please let us know how we can proceed forward to find out the root cause.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to