Hello all!

Please, help me to understand why my consumer start receives the
duplicates. I think it is because of problems on my kafka1 node.

Cluster consists of three nodes: kafka1 (192.168.137.19, id=1),
kafka2 (192.168.137.20, id=2), kafka3 ( 192.168.137.21, id=3)
Version of Kafka: kafka_2.13-2.4.1
Configs:
- Broker config (server.properties from kafka1):
https://pastebin.com/MR20rZdQ
- Zookeeper config (zookeeper.properties from kafka1):
https://pastebin.com/vCpFU0gp

/opt/kafka_2.13-2.4.1/bin/kafka-topics.sh --describe --topic in_raw
--zookeeper localhost:2181
Topic: in_raw   PartitionCount: 1       ReplicationFactor: 3    Configs:
        Topic: in_raw   Partition: 0    Leader: 1       Replicas: 1,3,2
Isr: 1,2,3

Producer put one msg in `in_raw' topic msg, after it our consumer starts
receive many dups from that topic every 10 minutes:

The first duplicate occurrence was at 20:01:
$ xzcat parsers.log-20201105.xz | perl -MData::Dumper -lne 'if
(/(unitId=\d+, unitDate=\d+, msgNumber=\d+)/) { ++$a->{$1}; die "$_\n" if
$a->{$1} > 1; }'
2020-11-04 20:01:47.173
[parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService
- PARSE: Event{id='aeb33e8d-f7e5-4734-bc41-3c93da3cf94f', unitId=1073,
unitDate=1604519428552, msgNumber=6948}
...

A couple of record from log file:
2020-11-04 19:54:52.740
[parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService
- PARSE: Event{id='86cc792b-fb5e-4ebb-be49-7a51f3a1c954', unitId=1073,
unitDate=1604519428552, msgNumber=6948}
2020-11-04 20:01:47.173
[parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService
- PARSE: Event{id='aeb33e8d-f7e5-4734-bc41-3c93da3cf94f', unitId=1073,
unitDate=1604519428552, msgNumber=6948}
2020-11-04 20:11:47.217
[parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService
- PARSE: Event{id='05a059e0-8002-48d0-b2da-269e42b879a0', unitId=1073,
unitDate=1604519428552, msgNumber=6948}
2020-11-04 20:21:47.185
[parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService
- PARSE: Event{id='5b590bde-9e86-4660-8916-db4a590ba12e', unitId=1073,
unitDate=1604519428552, msgNumber=6948}
..and so on.

Something went wrong earlier at 19:50.

Log from kafka1 broker:

[2020-11-04 19:04:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0
expired offsets in 0 milliseconds.
(kafka.coordinator.group.GroupMetadataManager)
[2020-11-04 19:14:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0
expired offsets in 0 milliseconds.
(kafka.coordinator.group.GroupMetadataManager)
[2020-11-04 19:24:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0
expired offsets in 0 milliseconds.
(kafka.coordinator.group.GroupMetadataManager)
[2020-11-04 19:34:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0
expired offsets in 0 milliseconds.
(kafka.coordinator.group.GroupMetadataManager)
[2020-11-04 19:44:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0
expired offsets in 0 milliseconds.
(kafka.coordinator.group.GroupMetadataManager)
[2020-11-04 19:50:19,506] WARN Client session timed out, have not heard
from server in 7997ms for sessionid 0x1000060d4310001
(org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:19,526] INFO Client session timed out, have not heard
from server in 7997ms for sessionid 0x1000060d4310001, closing socket
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:20,774] INFO Opening socket connection to server
kafka2.8m.local/192.168.137.20:2181. Will not attempt to authenticate using
SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:20,775] INFO Socket connection established, initiating
session, client: /192.168.137.19:57606, server: kafka2.8m.local/
192.168.137.20:2181 (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:22,776] WARN Client session timed out, have not heard
from server in 2002ms for sessionid 0x1000060d4310001
(org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:22,776] INFO Client session timed out, have not heard
from server in 2002ms for sessionid 0x1000060d4310001, closing socket
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,360] INFO Opening socket connection to server
kafka3.8m.local/192.168.137.21:2181. Will not attempt to authenticate using
SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,361] INFO Socket connection established, initiating
session, client: /192.168.137.19:54702, server: kafka3.8m.local/
192.168.137.21:2181 (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,373] WARN Unable to reconnect to ZooKeeper service,
session 0x1000060d4310001 has expired (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,373] INFO Unable to reconnect to ZooKeeper service,
session 0x1000060d4310001 has expired, closing socket connection
(org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,378] INFO EventThread shut down for session:
0x1000060d4310001 (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,389] INFO [ZooKeeperClient Kafka server] Session
expired. (kafka.zookeeper.ZooKeeperClient)
[2020-11-04 19:50:23,434] INFO [ZooKeeperClient Kafka server] Initializing
a new session to 192.168.137.19:2181,192.168.137.20:2181,192.168.137.21:2181.
(kafka.zookeeper.ZooKeeperClient)
[2020-11-04 19:50:23,436] INFO Initiating client connection, connectString=
192.168.137.19:2181,192.168.137.20:2181,192.168.137.21:2181
sessionTimeout=6000
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@1cbbffcd
(org.apache.zookeeper.ZooKeeper)
[2020-11-04 19:50:23,458] INFO jute.maxbuffer value is 4194304 Bytes
(org.apache.zookeeper.ClientCnxnSocket)
[2020-11-04 19:50:23,470] INFO zookeeper.request.timeout value is 0.
feature enabled= (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,490] INFO Opening socket connection to server
kafka1.8m.local/192.168.137.19:2181. Will not attempt to authenticate using
SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,491] INFO Socket connection established, initiating
session, client: /192.168.137.19:46826, server: kafka1.8m.local/
192.168.137.19:2181 (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,641] INFO Creating /brokers/ids/1 (is it secure?
false) (kafka.zk.KafkaZkClient)
[2020-11-04 19:50:23,666] INFO Unable to read additional data from server
sessionid 0x0, likely server has closed socket, closing socket connection
and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,768] INFO [ZooKeeperClient Kafka server] Waiting until
connected. (kafka.zookeeper.ZooKeeperClient)
[2020-11-04 19:50:23,768] INFO [ZooKeeperClient Kafka server] Waiting until
connected. (kafka.zookeeper.ZooKeeperClient)
[2020-11-04 19:50:23,784] INFO Opening socket connection to server
kafka2.8m.local/192.168.137.20:2181. Will not attempt to authenticate using
SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,785] INFO Socket connection established, initiating
session, client: /192.168.137.19:57614, server: kafka2.8m.local/
192.168.137.20:2181 (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,998] INFO Session establishment complete on server
kafka2.8m.local/192.168.137.20:2181, sessionid = 0x2000034d85c0003,
negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2020-11-04 19:50:23,998] INFO [ZooKeeperClient Kafka server] Connected.
(kafka.zookeeper.ZooKeeperClient)
[2020-11-04 19:50:23,998] INFO [ZooKeeperClient Kafka server] Connected.
(kafka.zookeeper.ZooKeeperClient)
[2020-11-04 19:50:24,230] ERROR [KafkaApi-1] Error when handling request:
clientId=3, correlationId=0, api=UPDATE_METADATA, version=6,
body={controller_id=3,controller_epoch=3,broker_epoch=17179870048,topic_states=[<snipped>],rack=null,_tagged_fields={}},{id=1,endpoints=[{port=9092,host=192.168.137.19,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}}],_tagged_fields={}}
(kafka.server.KafkaApis)
java.lang.IllegalStateException: Epoch 17179870048 larger than current
broker epoch 17179869253
at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2917)
at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:269)
at kafka.server.KafkaApis.handle(KafkaApis.scala:133)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
at java.lang.Thread.run(Thread.java:748)
[2020-11-04 19:50:24,268] ERROR [KafkaApi-1] Error when handling request:
clientId=3, correlationId=1, api=LEADER_AND_ISR, version=4,
body={controller_id=3,controller_epoch=3,broker_epoch=17179870048,topic_states=[<snipped>],_tagged_fields={}}],live_leaders=[{broker_id=3,host_name=192.168.137.21,port=9092,_tagged_fields={}},{broker_id=2,host_name=192.168.137.20,port=9092,_tagged_fields={}}],_tagged_fields={}}
(kafka.server.KafkaApis)
java.lang.IllegalStateException: Epoch 17179870048 larger than current
broker epoch 17179869253
at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2917)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:211)
at kafka.server.KafkaApis.handle(KafkaApis.scala:131)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
at java.lang.Thread.run(Thread.java:748)
[2020-11-04 19:50:24,277] ERROR [KafkaApi-1] Error when handling request:
clientId=3, correlationId=2, api=UPDATE_METADATA, version=6,
body={controller_id=3,controller_epoch=3,broker_epoch=17179870048,topic_states=[<snipped>],_tagged_fields={}}],live_brokers=[{id=3,endpoints=[{port=9092,host=192.168.137.21,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}},{id=2,endpoints=[{port=9092,host=192.168.137.20,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}},{id=1,endpoints=[{port=9092,host=192.168.137.19,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}}],_tagged_fields={}}
(kafka.server.KafkaApis)
java.lang.IllegalStateException: Epoch 17179870048 larger than current
broker epoch 17179869253
at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2917)
at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:269)
at kafka.server.KafkaApis.handle(KafkaApis.scala:133)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
at java.lang.Thread.run(Thread.java:748)
[2020-11-04 19:50:24,786] INFO Stat of the created znode at /brokers/ids/1
is:
17179870048,17179870048,1604519424151,1604519424151,1,0,0,144115415044063235,200,0,17179870048
 (kafka.zk.KafkaZkClient)
[2020-11-04 19:50:24,791] INFO Registered broker 1 at path /brokers/ids/1
with addresses:
ArraySeq(EndPoint(192.168.137.19,9092,ListenerName(PLAINTEXT),PLAINTEXT)),
czxid (broker epoch): 17179870048 (kafka.zk.KafkaZkClient)

Controller of kafka1 broker:
[2020-11-04 19:50:23,442] DEBUG [Controller id=1] Resigning
(kafka.controller.KafkaController)
[2020-11-04 19:50:23,496] DEBUG [Controller id=1] Unregister
BrokerModifications handler for Set() (kafka.controller.KafkaController)
[2020-11-04 19:50:23,519] INFO [PartitionStateMachine controllerId=1]
Stopped partition state machine (kafka.controller.ZkPartitionStateMachine)
[2020-11-04 19:50:23,526] INFO [ReplicaStateMachine controllerId=1] Stopped
replica state machine (kafka.controller.ZkReplicaStateMachine)
[2020-11-04 19:50:23,560] INFO [Controller id=1] Resigned
(kafka.controller.KafkaController)
[2020-11-04 19:50:24,797] DEBUG [Controller id=1] Broker 3 has been elected
as the controller, so stopping the election process.
(kafka.controller.KafkaController)

Zookeeper on kafka1:
[2020-11-04 19:50:20,215] WARN Unable to read additional data from client
sessionid 0x1000060d4310000, likely client has closed socket
(org.apache.zookeeper.server.NIOServerCnxn)
[2020-11-04 19:50:20,215] WARN Unable to read additional data from client
sessionid 0x1000060d4310001, likely client has closed socket
(org.apache.zookeeper.server.NIOServerCnxn)
[2020-11-04 19:50:21,495] WARN Exception when following the leader
(org.apache.zookeeper.server.quorum.Learner)
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:84)
at
org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:85)
at
org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:118)
at org.apache.zookeeper.server.quorum.Learner.readPacket(Learner.java:158)
at
org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:92)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:1253)
[2020-11-04 19:50:22,000] INFO shutdown called
(org.apache.zookeeper.server.quorum.Learner)
java.lang.Exception: shutdown Follower
at org.apache.zookeeper.server.quorum.Follower.shutdown(Follower.java:201)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:1257)
[2020-11-04 19:50:22,043] INFO Shutting down
(org.apache.zookeeper.server.ZooKeeperServer)
[2020-11-04 19:50:22,043] INFO shutting down
(org.apache.zookeeper.server.ZooKeeperServer)
[2020-11-04 19:50:22,059] INFO Shutting down
(org.apache.zookeeper.server.quorum.FollowerRequestProcessor)
[2020-11-04 19:50:22,060] INFO Shutting down
(org.apache.zookeeper.server.quorum.CommitProcessor)
[2020-11-04 19:50:22,139] INFO FollowerRequestProcessor exited loop!
(org.apache.zookeeper.server.quorum.FollowerRequestProcessor)
[2020-11-04 19:50:22,141] INFO CommitProcessor exited loop!
(org.apache.zookeeper.server.quorum.CommitProcessor)
[2020-11-04 19:50:22,218] INFO shutdown of request processor complete
(org.apache.zookeeper.server.FinalRequestProcessor)
[2020-11-04 19:50:22,637] INFO Shutting down
(org.apache.zookeeper.server.SyncRequestProcessor)
[2020-11-04 19:50:22,655] INFO SyncRequestProcessor exited!
(org.apache.zookeeper.server.SyncRequestProcessor)
[2020-11-04 19:50:22,658] WARN PeerState set to LOOKING
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2020-11-04 19:50:22,694] INFO LOOKING
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2020-11-04 19:50:23,044] INFO New election. My id =  1, proposed
zxid=0x400000281 (org.apache.zookeeper.server.quorum.FastLeaderElection)
[2020-11-04 19:50:23,170] INFO Notification: 2 (message format version), 1
(n.leader), 0x400000281 (n.zxid), 0x5 (n.round), LOOKING (n.state), 1
(n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version)
(org.apache.zookeeper.server.quorum.FastLeaderElection)
[2020-11-04 19:50:23,293] INFO Notification: 2 (message format version), 3
(n.leader), 0x300000020 (n.zxid), 0x4 (n.round), LEADING (n.state), 3
(n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version)
(org.apache.zookeeper.server.quorum.FastLeaderElection)
[2020-11-04 19:50:23,500] INFO Notification time out: 400
(org.apache.zookeeper.server.quorum.FastLeaderElection)
[2020-11-04 19:50:23,501] INFO Notification: 2 (message format version), 1
(n.leader), 0x400000281 (n.zxid), 0x5 (n.round), LOOKING (n.state), 1
(n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version)
(org.apache.zookeeper.server.quorum.FastLeaderElection)
[2020-11-04 19:50:23,503] INFO Notification: 2 (message format version), 3
(n.leader), 0x300000020 (n.zxid), 0x4 (n.round), LEADING (n.state), 3
(n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version)
(org.apache.zookeeper.server.quorum.FastLeaderElection)
[2020-11-04 19:50:23,665] WARN Exception causing close of session 0x0:
ZooKeeperServer not running (org.apache.zookeeper.server.NIOServerCnxn)
[2020-11-04 19:50:23,680] INFO Notification: 2 (message format version), 3
(n.leader), 0x300000020 (n.zxid), 0x4 (n.round), FOLLOWING (n.state), 2
(n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version)
(org.apache.zookeeper.server.quorum.FastLeaderElection)
[2020-11-04 19:50:23,682] INFO FOLLOWING
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2020-11-04 19:50:23,707] INFO minSessionTimeout set to 4000
(org.apache.zookeeper.server.ZooKeeperServer)
[2020-11-04 19:50:23,708] INFO maxSessionTimeout set to 40000
(org.apache.zookeeper.server.ZooKeeperServer)
[2020-11-04 19:50:23,708] INFO Created server with tickTime 2000
minSessionTimeout 4000 maxSessionTimeout 40000 datadir
/data/zookeeper/version-2 snapdir /data/zookeeper/version-2
(org.apache.zookeeper.server.ZooKeeperServer)
[2020-11-04 19:50:23,719] INFO FOLLOWING - LEADER ELECTION TOOK - 30 MS
(org.apache.zookeeper.server.quorum.Learner)
[2020-11-04 19:50:23,786] INFO Getting a diff from the leader 0x40000035e
(org.apache.zookeeper.server.quorum.Learner)
[2020-11-04 19:50:23,798] WARN Got zxid 0x400000282 expected 0x1
(org.apache.zookeeper.server.quorum.Learner)
[2020-11-04 19:50:23,809] INFO Learner received NEWLEADER message
(org.apache.zookeeper.server.quorum.Learner)
[2020-11-04 19:50:23,818] INFO Learner received UPTODATE message
(org.apache.zookeeper.server.quorum.Learner)
[2020-11-04 19:50:23,831] INFO Configuring CommitProcessor with 4 worker
threads. (org.apache.zookeeper.server.quorum.CommitProcessor)
[2020-11-04 19:50:23,863] WARN Got zxid 0x40000035f expected 0x1
(org.apache.zookeeper.server.quorum.Learner)

I appreciate any help.

-- 
Regards,
Denis

Reply via email to