Hi,
I am running a Kafka Sink Connector with Kafka 0.9.0.2. I am seeing that my
consumer is periodically throwing an error when saving offsets and then
going into group rebalance state. Please let me know what can be done to
fix this issue
2017-01-20 17:22:21 INFO WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
2017-01-20 17:22:21 DEBUG NetworkClient : 619 - Sending metadata request
ClientRequest(expectResponse=true, callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=74,client_id=consumer-3},
body={topics=[monitorAlert]}), isInitiatedByNetworkClient,
createdTimeMs=1484954541923, sendTimeMs=0) to node 1001
2017-01-20 17:22:21 DEBUG Selector : 307 - Connection with clpd355.sldc./
135.41.3.85 disconnected
java.io.EOFException
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:907)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:852)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
2017-01-20 17:22:21 DEBUG NetworkClient : 454 - Node 2147482646
disconnected.
2017-01-20 17:22:21 DEBUG ConsumerNetworkClient : 376 - Cancelled
OFFSET_COMMIT request ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@6428047c,
request=RequestSend(header={api_key=8,api_version=2,correlation_id=73,client_id=consumer-3},
body={group_id=connect-jdbc-sink
-connector,group_generation_id=1,member_id=consumer-3-5a029818-f6be-4fde-b7e0-1bf751ff9722,retention_time=-1,topics=[{topic=monitorAlert,partitions=[{partition=0,offset=12755,metadata=},{partition=1,offset=13113,metadata=},{partition=2,offset=12832,metadata=}]}]}),
createdTimeMs=1484954541923, sendTimeMs=0) with correlation id 73 due to
node 2147482646 being dis
connected
2017-01-20 17:22:21 INFO AbstractCoordinator : 535 - Marking the
coordinator 2147482646 dead.
2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
exception:
org.apache.kafka.common.errors.DisconnectException
2017-01-20 17:22:21 INFO WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
exception:
org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The
group coordinator is not available.
2017-01-20 17:22:21 DEBUG AbstractCoordinator : 471 - Issuing group
metadata request to broker 1001
2017-01-20 17:22:21 DEBUG Metadata : 172 - Updated cluster metadata version
6 to Cluster(nodes = [Node(1001, clpd355.sldc.sbc.com, 6667)], partitions =
[Partition(topic = monitorAlert, partition = 2, leader = 1001, replicas =
[1001,], isr = [1001,], Partition(topic = monitorAlert, partition = 0,
leader = 1001, replicas = [1001,], isr = [1001,], Partition(topic =
monitorAlert, partition = 1, leader = 1001, replicas = [1001,], isr =
[1001,]])
2017-01-20 17:22:21 DEBUG AbstractCoordinator : 484 - Group metadata
response ClientResponse(receivedTimeMs=1484954541926, disconnected=false,
request=ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@4940d8ef,
request=RequestSend(header={api_key=10,api_version=0,correlati
on_id=75,client_id=consumer-3},
body={group_id=connect-jdbc-sink-connector}), createdTimeMs=1484954541925,
sendTimeMs=1484954541925),
responseBody={error_code=0,coordinator={node_id=1001,host=
clpd355.sldc.sbc.com,port=6667}})
2017-01-20 17:22:21 INFO AbstractCoordinator : 535 - Marking the
coordinator 2147482646 dead.
I see some group metadata request calls after this and then I see the
ILLEGAL_GENERATION error
2017-01-20 17:22:21 DEBUG AbstractCoordinator : 484 - Group metadata
response ClientResponse(receivedTimeMs=1484954541975, disconnected=false,
request=ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@53c5eee0,
request=RequestSend(header={api_key=10,api_version=0,correlation_id=109,client_id=consumer-3},
body={group_id=connect-jdbc-sink-connector}), createdTimeMs=1484954541974,
sendTimeMs=1484954541974),
responseBody={error_code=0,coordinator={node_id=1001,host=
clpd355.sldc.sbc.com,port=6667}})
2017-01-20 17:22:21 DEBUG NetworkClient : 487 - Initiating connection to
node 2147482646 at clpd355.sldc.sbc.com:6667.
2017-01-20 17:22:21 DEBUG SaslClientAuthenticator : 103 - Creating
SaslClient: client=cdap/clpd355@COREITLTSTL10DEV01
;service=kafka;serviceHostname=clpd355.sld.com;mechs=[GSSAPI]
2017-01-20 17:22:21 DEBUG NetworkClient : 467 - Completed connection to
node 2147482646
2017-01-20 17:22:21 INFO WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:21 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12755, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
2017-01-20 17:22:21 ERROR WorkerSinkTask : 101 - Commit of
WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
exception:
org.apache.kafka.clients.consumer.internals.SendFailedException
2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
ILLEGAL_GENERATION occurred while committing offsets for group
connect-jdbc-sink-connector
2017-01-20 17:22:25 ERROR WorkerSinkTask : 101 - Commit of
WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
exception:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed due to group rebalance
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:671)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:650)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:907)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:852)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
2017-01-20 17:22:25 INFO WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:25 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12832, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12756, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13113, metadata=''}}
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 241 - Revoking previously
assigned partitions [monitorAlert-2, monitorAlert-1, monitorAlert-0]
2017-01-20 17:22:25 INFO WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
ILLEGAL_GENERATION occurred while committing offsets for group
connect-jdbc-sink-connector
2017-01-20 17:22:25 ERROR WorkerSinkTask : 101 - Commit of
WorkerSinkTask{id=jdbc-sink-connector-0} offsets threw an unexpected
exception:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed due to group rebalance
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:671)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:650)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:967)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:207)
at
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:376)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:244)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:208)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:305)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:889)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:852)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
2017-01-20 17:22:25 ERROR ConsumerCoordinator : 544 - Error
ILLEGAL_GENERATION occurred while committing offsets for group
connect-jdbc-sink-connector
2017-01-20 17:22:25 DEBUG WorkerSinkTask : 96 - Got callback for timed out
commit Thread[WorkerSinkTask-jdbc-sink-connector-0,5,main]: -1, but most
recent commit is 47
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 310 - (Re-)joining group
connect-jdbc-sink-connector
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 324 - Issuing request
(JOIN_GROUP:
{group_id=connect-jdbc-sink-connector,session_timeout=300000,member_id=consumer-3-5a029818-f6be-4fde-b7e0-1bf751ff9722,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
lim=24 cap=24]}]}) to coordinator 2147482646
2017-01-20 17:22:25 INFO AbstractCoordinator : 360 - Attempt to join group
connect-jdbc-sink-connector failed due to unknown member id, resetting and
retrying.
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 310 - (Re-)joining group
connect-jdbc-sink-connector
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 324 - Issuing request
(JOIN_GROUP:
{group_id=connect-jdbc-sink-connector,session_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
lim=24 cap=24]}]}) to coordinator 2147482646
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 342 - Joined group:
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,member_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,members=[{member_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,member_metadata=java.nio.HeapByteBuffer[pos=0
lim=24 cap=24]}]}
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 219 - Performing range
assignment for subscriptions
{consumer-3-842b27da-535a-4715-aacc-1d38e2818586=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Subscription@76f0d523
}
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 223 - Finished assignment:
{consumer-3-842b27da-535a-4715-aacc-1d38e2818586=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Assignment@3d3929d6
}
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 403 - Issuing leader
SyncGroup (SYNC_GROUP:
{group_id=connect-jdbc-sink-connector,generation_id=1,member_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,group_assignment=[{member_id=consumer-3-842b27da-535a-4715-aacc-1d38e2818586,member_assignment=java.nio.HeapByteBuffer[pos=0
lim=40 cap=40]}]}) to coordinator 2147482646
2017-01-20 17:22:25 DEBUG AbstractCoordinator : 429 - Received successful
sync group response for group connect-jdbc-sink-connector:
{error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=40
cap=40]}
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 185 - Setting newly
assigned partitions [monitorAlert-2, monitorAlert-1, monitorAlert-0]
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 575 - Fetching committed
offsets for partitions: [monitorAlert-2, monitorAlert-1, monitorAlert-0]
2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
monitorAlert-2 to the committed offset 12768
2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
assigned topic partition monitorAlert-2 with offset 12768
2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
monitorAlert-0 to the committed offset 12690
2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
assigned topic partition monitorAlert-0 with offset 12690
2017-01-20 17:22:25 DEBUG Fetcher : 170 - Resetting offset for partition
monitorAlert-1 to the committed offset 13048
2017-01-20 17:22:25 DEBUG WorkerSinkTask : 341 - jdbc-sink-connector-0
assigned topic partition monitorAlert-1 with offset 13048
2017-01-20 17:22:25 INFO WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:22:25 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12768, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12690, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13048, metadata=''}}
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
12768 for partition monitorAlert-2
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
13048 for partition monitorAlert-1
2017-01-20 17:22:25 DEBUG ConsumerCoordinator : 512 - Committed offset
12690 for partition monitorAlert-0
2017-01-20 17:22:25 DEBUG WorkerSinkTask : 104 - Finished
WorkerSinkTask{id=jdbc-sink-connector-0} offset commit successfully in 2 ms
Then it tries to commit offsets multiple times. Finally, it throws this
error -
017-01-20 17:25:40 INFO WorkerSinkTask : 187 -
WorkerSinkTask{id=jdbc-sink-connector-0} Committing offsets
2017-01-20 17:25:40 DEBUG KafkaConsumer : 1023 - Committing offsets:
{monitorAlert-2=OffsetAndMetadata{offset=12768, metadata=''},
monitorAlert-0=OffsetAndMetadata{offset=12690, metadata=''},
monitorAlert-1=OffsetAndMetadata{offset=13048, metadata=''}}
2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
12768 for partition monitorAlert-2
2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
13048 for partition monitorAlert-1
2017-01-20 17:25:40 DEBUG ConsumerCoordinator : 512 - Committed offset
12690 for partition monitorAlert-0
2017-01-20 17:25:40 DEBUG WorkerSinkTask : 104 - Finished
WorkerSinkTask{id=jdbc-sink-connector-0} offset commit successfully in 2 ms
2017-01-20 17:25:40 DEBUG AbstractCoordinator : 621 - Received successful
heartbeat response.
2017-01-20 17:25:41 DEBUG session : 347 - Scavenging sessions at
1484954741657
2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
partition monitorAlert-2 since its offset 12832 does not match the expected
offset 12768
2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
partition monitorAlert-1 since its offset 13113 does not match the expected
offset 13048
2017-01-20 17:25:41 DEBUG Fetcher : 554 - Discarding fetch response for
partition monitorAlert-0 since its offset 12756 does not match the expected
offset 12690
Here is the configuration in my connect-distributed.properties-
bootstrap.servers=clpd355:6667
group.id=alert
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.flush.interval.ms=10000
config.storage.topic=connect-configs
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
producer.sasl.kerberos.service.name=kafka
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.mechanism=GSSAPI
consumer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.mechanism=GSSAPI
consumer.fetch.message.max.bytes =1048576
consumer.session.timeout.ms=300000
consumer.request.timeout.ms=310000
consumer.max.partition.fetch.bytes=1048576
consumer.heartbeat.interval.ms= 60000
consumer.fetch.max.wait.ms=200000
Thanks,
Sri