Hello all! Please, help me to understand why my consumer start receives the duplicates. I think it is because of problems on my kafka1 node.
Cluster consists of three nodes: kafka1 (192.168.137.19, id=1), kafka2 (192.168.137.20, id=2), kafka3 ( 192.168.137.21, id=3) Version of Kafka: kafka_2.13-2.4.1 Configs: - Broker config (server.properties from kafka1): https://pastebin.com/MR20rZdQ - Zookeeper config (zookeeper.properties from kafka1): https://pastebin.com/vCpFU0gp /opt/kafka_2.13-2.4.1/bin/kafka-topics.sh --describe --topic in_raw --zookeeper localhost:2181 Topic: in_raw PartitionCount: 1 ReplicationFactor: 3 Configs: Topic: in_raw Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,2,3 Producer put one msg in `in_raw' topic msg, after it our consumer starts receive many dups from that topic every 10 minutes: The first duplicate occurrence was at 20:01: $ xzcat parsers.log-20201105.xz | perl -MData::Dumper -lne 'if (/(unitId=\d+, unitDate=\d+, msgNumber=\d+)/) { ++$a->{$1}; die "$_\n" if $a->{$1} > 1; }' 2020-11-04 20:01:47.173 [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService - PARSE: Event{id='aeb33e8d-f7e5-4734-bc41-3c93da3cf94f', unitId=1073, unitDate=1604519428552, msgNumber=6948} ... A couple of record from log file: 2020-11-04 19:54:52.740 [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService - PARSE: Event{id='86cc792b-fb5e-4ebb-be49-7a51f3a1c954', unitId=1073, unitDate=1604519428552, msgNumber=6948} 2020-11-04 20:01:47.173 [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService - PARSE: Event{id='aeb33e8d-f7e5-4734-bc41-3c93da3cf94f', unitId=1073, unitDate=1604519428552, msgNumber=6948} 2020-11-04 20:11:47.217 [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService - PARSE: Event{id='05a059e0-8002-48d0-b2da-269e42b879a0', unitId=1073, unitDate=1604519428552, msgNumber=6948} 2020-11-04 20:21:47.185 [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService - PARSE: Event{id='5b590bde-9e86-4660-8916-db4a590ba12e', unitId=1073, unitDate=1604519428552, msgNumber=6948} ..and so on. Something went wrong earlier at 19:50. Log from kafka1 broker: [2020-11-04 19:04:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2020-11-04 19:14:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2020-11-04 19:24:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2020-11-04 19:34:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2020-11-04 19:44:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2020-11-04 19:50:19,506] WARN Client session timed out, have not heard from server in 7997ms for sessionid 0x1000060d4310001 (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:19,526] INFO Client session timed out, have not heard from server in 7997ms for sessionid 0x1000060d4310001, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:20,774] INFO Opening socket connection to server kafka2.8m.local/192.168.137.20:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:20,775] INFO Socket connection established, initiating session, client: /192.168.137.19:57606, server: kafka2.8m.local/ 192.168.137.20:2181 (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:22,776] WARN Client session timed out, have not heard from server in 2002ms for sessionid 0x1000060d4310001 (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:22,776] INFO Client session timed out, have not heard from server in 2002ms for sessionid 0x1000060d4310001, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,360] INFO Opening socket connection to server kafka3.8m.local/192.168.137.21:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,361] INFO Socket connection established, initiating session, client: /192.168.137.19:54702, server: kafka3.8m.local/ 192.168.137.21:2181 (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,373] WARN Unable to reconnect to ZooKeeper service, session 0x1000060d4310001 has expired (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,373] INFO Unable to reconnect to ZooKeeper service, session 0x1000060d4310001 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,378] INFO EventThread shut down for session: 0x1000060d4310001 (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,389] INFO [ZooKeeperClient Kafka server] Session expired. (kafka.zookeeper.ZooKeeperClient) [2020-11-04 19:50:23,434] INFO [ZooKeeperClient Kafka server] Initializing a new session to 192.168.137.19:2181,192.168.137.20:2181,192.168.137.21:2181. (kafka.zookeeper.ZooKeeperClient) [2020-11-04 19:50:23,436] INFO Initiating client connection, connectString= 192.168.137.19:2181,192.168.137.20:2181,192.168.137.21:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@1cbbffcd (org.apache.zookeeper.ZooKeeper) [2020-11-04 19:50:23,458] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket) [2020-11-04 19:50:23,470] INFO zookeeper.request.timeout value is 0. feature enabled= (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,490] INFO Opening socket connection to server kafka1.8m.local/192.168.137.19:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,491] INFO Socket connection established, initiating session, client: /192.168.137.19:46826, server: kafka1.8m.local/ 192.168.137.19:2181 (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,641] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.zk.KafkaZkClient) [2020-11-04 19:50:23,666] INFO Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,768] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2020-11-04 19:50:23,768] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2020-11-04 19:50:23,784] INFO Opening socket connection to server kafka2.8m.local/192.168.137.20:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,785] INFO Socket connection established, initiating session, client: /192.168.137.19:57614, server: kafka2.8m.local/ 192.168.137.20:2181 (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,998] INFO Session establishment complete on server kafka2.8m.local/192.168.137.20:2181, sessionid = 0x2000034d85c0003, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2020-11-04 19:50:23,998] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) [2020-11-04 19:50:23,998] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) [2020-11-04 19:50:24,230] ERROR [KafkaApi-1] Error when handling request: clientId=3, correlationId=0, api=UPDATE_METADATA, version=6, body={controller_id=3,controller_epoch=3,broker_epoch=17179870048,topic_states=[<snipped>],rack=null,_tagged_fields={}},{id=1,endpoints=[{port=9092,host=192.168.137.19,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}}],_tagged_fields={}} (kafka.server.KafkaApis) java.lang.IllegalStateException: Epoch 17179870048 larger than current broker epoch 17179869253 at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2917) at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:269) at kafka.server.KafkaApis.handle(KafkaApis.scala:133) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70) at java.lang.Thread.run(Thread.java:748) [2020-11-04 19:50:24,268] ERROR [KafkaApi-1] Error when handling request: clientId=3, correlationId=1, api=LEADER_AND_ISR, version=4, body={controller_id=3,controller_epoch=3,broker_epoch=17179870048,topic_states=[<snipped>],_tagged_fields={}}],live_leaders=[{broker_id=3,host_name=192.168.137.21,port=9092,_tagged_fields={}},{broker_id=2,host_name=192.168.137.20,port=9092,_tagged_fields={}}],_tagged_fields={}} (kafka.server.KafkaApis) java.lang.IllegalStateException: Epoch 17179870048 larger than current broker epoch 17179869253 at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2917) at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:211) at kafka.server.KafkaApis.handle(KafkaApis.scala:131) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70) at java.lang.Thread.run(Thread.java:748) [2020-11-04 19:50:24,277] ERROR [KafkaApi-1] Error when handling request: clientId=3, correlationId=2, api=UPDATE_METADATA, version=6, body={controller_id=3,controller_epoch=3,broker_epoch=17179870048,topic_states=[<snipped>],_tagged_fields={}}],live_brokers=[{id=3,endpoints=[{port=9092,host=192.168.137.21,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}},{id=2,endpoints=[{port=9092,host=192.168.137.20,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}},{id=1,endpoints=[{port=9092,host=192.168.137.19,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}}],_tagged_fields={}} (kafka.server.KafkaApis) java.lang.IllegalStateException: Epoch 17179870048 larger than current broker epoch 17179869253 at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2917) at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:269) at kafka.server.KafkaApis.handle(KafkaApis.scala:133) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70) at java.lang.Thread.run(Thread.java:748) [2020-11-04 19:50:24,786] INFO Stat of the created znode at /brokers/ids/1 is: 17179870048,17179870048,1604519424151,1604519424151,1,0,0,144115415044063235,200,0,17179870048 (kafka.zk.KafkaZkClient) [2020-11-04 19:50:24,791] INFO Registered broker 1 at path /brokers/ids/1 with addresses: ArraySeq(EndPoint(192.168.137.19,9092,ListenerName(PLAINTEXT),PLAINTEXT)), czxid (broker epoch): 17179870048 (kafka.zk.KafkaZkClient) Controller of kafka1 broker: [2020-11-04 19:50:23,442] DEBUG [Controller id=1] Resigning (kafka.controller.KafkaController) [2020-11-04 19:50:23,496] DEBUG [Controller id=1] Unregister BrokerModifications handler for Set() (kafka.controller.KafkaController) [2020-11-04 19:50:23,519] INFO [PartitionStateMachine controllerId=1] Stopped partition state machine (kafka.controller.ZkPartitionStateMachine) [2020-11-04 19:50:23,526] INFO [ReplicaStateMachine controllerId=1] Stopped replica state machine (kafka.controller.ZkReplicaStateMachine) [2020-11-04 19:50:23,560] INFO [Controller id=1] Resigned (kafka.controller.KafkaController) [2020-11-04 19:50:24,797] DEBUG [Controller id=1] Broker 3 has been elected as the controller, so stopping the election process. (kafka.controller.KafkaController) Zookeeper on kafka1: [2020-11-04 19:50:20,215] WARN Unable to read additional data from client sessionid 0x1000060d4310000, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn) [2020-11-04 19:50:20,215] WARN Unable to read additional data from client sessionid 0x1000060d4310001, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn) [2020-11-04 19:50:21,495] WARN Exception when following the leader (org.apache.zookeeper.server.quorum.Learner) java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:84) at org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:85) at org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:118) at org.apache.zookeeper.server.quorum.Learner.readPacket(Learner.java:158) at org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:92) at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:1253) [2020-11-04 19:50:22,000] INFO shutdown called (org.apache.zookeeper.server.quorum.Learner) java.lang.Exception: shutdown Follower at org.apache.zookeeper.server.quorum.Follower.shutdown(Follower.java:201) at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:1257) [2020-11-04 19:50:22,043] INFO Shutting down (org.apache.zookeeper.server.ZooKeeperServer) [2020-11-04 19:50:22,043] INFO shutting down (org.apache.zookeeper.server.ZooKeeperServer) [2020-11-04 19:50:22,059] INFO Shutting down (org.apache.zookeeper.server.quorum.FollowerRequestProcessor) [2020-11-04 19:50:22,060] INFO Shutting down (org.apache.zookeeper.server.quorum.CommitProcessor) [2020-11-04 19:50:22,139] INFO FollowerRequestProcessor exited loop! (org.apache.zookeeper.server.quorum.FollowerRequestProcessor) [2020-11-04 19:50:22,141] INFO CommitProcessor exited loop! (org.apache.zookeeper.server.quorum.CommitProcessor) [2020-11-04 19:50:22,218] INFO shutdown of request processor complete (org.apache.zookeeper.server.FinalRequestProcessor) [2020-11-04 19:50:22,637] INFO Shutting down (org.apache.zookeeper.server.SyncRequestProcessor) [2020-11-04 19:50:22,655] INFO SyncRequestProcessor exited! (org.apache.zookeeper.server.SyncRequestProcessor) [2020-11-04 19:50:22,658] WARN PeerState set to LOOKING (org.apache.zookeeper.server.quorum.QuorumPeer) [2020-11-04 19:50:22,694] INFO LOOKING (org.apache.zookeeper.server.quorum.QuorumPeer) [2020-11-04 19:50:23,044] INFO New election. My id = 1, proposed zxid=0x400000281 (org.apache.zookeeper.server.quorum.FastLeaderElection) [2020-11-04 19:50:23,170] INFO Notification: 2 (message format version), 1 (n.leader), 0x400000281 (n.zxid), 0x5 (n.round), LOOKING (n.state), 1 (n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version) (org.apache.zookeeper.server.quorum.FastLeaderElection) [2020-11-04 19:50:23,293] INFO Notification: 2 (message format version), 3 (n.leader), 0x300000020 (n.zxid), 0x4 (n.round), LEADING (n.state), 3 (n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version) (org.apache.zookeeper.server.quorum.FastLeaderElection) [2020-11-04 19:50:23,500] INFO Notification time out: 400 (org.apache.zookeeper.server.quorum.FastLeaderElection) [2020-11-04 19:50:23,501] INFO Notification: 2 (message format version), 1 (n.leader), 0x400000281 (n.zxid), 0x5 (n.round), LOOKING (n.state), 1 (n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version) (org.apache.zookeeper.server.quorum.FastLeaderElection) [2020-11-04 19:50:23,503] INFO Notification: 2 (message format version), 3 (n.leader), 0x300000020 (n.zxid), 0x4 (n.round), LEADING (n.state), 3 (n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version) (org.apache.zookeeper.server.quorum.FastLeaderElection) [2020-11-04 19:50:23,665] WARN Exception causing close of session 0x0: ZooKeeperServer not running (org.apache.zookeeper.server.NIOServerCnxn) [2020-11-04 19:50:23,680] INFO Notification: 2 (message format version), 3 (n.leader), 0x300000020 (n.zxid), 0x4 (n.round), FOLLOWING (n.state), 2 (n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version) (org.apache.zookeeper.server.quorum.FastLeaderElection) [2020-11-04 19:50:23,682] INFO FOLLOWING (org.apache.zookeeper.server.quorum.QuorumPeer) [2020-11-04 19:50:23,707] INFO minSessionTimeout set to 4000 (org.apache.zookeeper.server.ZooKeeperServer) [2020-11-04 19:50:23,708] INFO maxSessionTimeout set to 40000 (org.apache.zookeeper.server.ZooKeeperServer) [2020-11-04 19:50:23,708] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /data/zookeeper/version-2 snapdir /data/zookeeper/version-2 (org.apache.zookeeper.server.ZooKeeperServer) [2020-11-04 19:50:23,719] INFO FOLLOWING - LEADER ELECTION TOOK - 30 MS (org.apache.zookeeper.server.quorum.Learner) [2020-11-04 19:50:23,786] INFO Getting a diff from the leader 0x40000035e (org.apache.zookeeper.server.quorum.Learner) [2020-11-04 19:50:23,798] WARN Got zxid 0x400000282 expected 0x1 (org.apache.zookeeper.server.quorum.Learner) [2020-11-04 19:50:23,809] INFO Learner received NEWLEADER message (org.apache.zookeeper.server.quorum.Learner) [2020-11-04 19:50:23,818] INFO Learner received UPTODATE message (org.apache.zookeeper.server.quorum.Learner) [2020-11-04 19:50:23,831] INFO Configuring CommitProcessor with 4 worker threads. (org.apache.zookeeper.server.quorum.CommitProcessor) [2020-11-04 19:50:23,863] WARN Got zxid 0x40000035f expected 0x1 (org.apache.zookeeper.server.quorum.Learner) I appreciate any help. -- Regards, Denis
