[ https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Elias Levy closed FLINK-9731. ----------------------------- Resolution: Invalid > Kafka source subtask begins to consume from earliest offset > ----------------------------------------------------------- > > Key: FLINK-9731 > URL: https://issues.apache.org/jira/browse/FLINK-9731 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.2 > Reporter: Elias Levy > Priority: Critical > > On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink > job instance began consuming records from the earliest offsets available in > Kafka for the partitions assigned to it. Other subtasks did not exhibit this > behavior and continued operating normally. > Previous to the event the job exhibited no Kafka lag. The job showed no > failed checkpoints and the job did not restore or restart. Flink logs only > shoed the following message: > {noformat} > June 30th 2018, 02:35:01.711 Fetch offset 2340400514 is out of range for > partition topic-124, resetting offset > {noformat} > The job is configured with checkpoints at 1 minute intervals. The Kafka > connector consumer is configured to start from group offsets if it is not > started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka > consumer is configured to fallback to the earliest offsets is no group > offsets are committed by setting `auto.offset.reset` to `earliest` in the > Kafka consumer config. > Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership > of its partitions for around 30 seconds as a result of losing its connection > to ZooKeeper. > > {noformat} > [2018-06-30 09:34:54,799] INFO Unable to read additional data from server > sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for > partition [cloud_ioc_events,32] to broker > 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This > server is not the leader for that topic-partition. > (kafka.server.ReplicaFetcherThread) > {noformat} > The broker immediately reconnected to after a few tries ZK: > {noformat} > [2018-06-30 09:34:55,462] INFO Opening socket connection to server > 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,463] INFO Socket connection established to > 10.210.48.187/10.210.48.187:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, > session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,465] INFO Initiating client connection, > connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka > sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 > (org.apache.zookeeper.ZooKeeper) > [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, > session 0x161305b7bd81a09 has expired, closing socket connection > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,466] INFO EventThread shut down for session: > 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,468] INFO Opening socket connection to server > 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,468] INFO Socket connection established to > 10.210.43.200/10.210.43.200:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,471] INFO Session establishment complete on server > 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated > timeout = 6000 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker > 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener) > [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? > false) (kafka.utils.ZKCheckedEphemeral) > [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK > (kafka.utils.ZKCheckedEphemeral) > [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path > /brokers/ids/2005 with addresses: > EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT) > (kafka.utils.ZkUtils) > [2018-06-30 09:34:55,476] INFO done re-registering broker > (kafka.server.KafkaHealthcheck$SessionExpireListener) > [2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch > for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener) > {noformat} > By 9:35:02 partitions had returned to the broker. > It appears this it the broker that the subtask was consuming from, as > outgoing network traffic from it spiked after the broker recovered leadership > of its partitions, which is consistent with the subtask starting to consuming > from the earliest offset. > This may have been related to this [Kafka issue > 5600](https://issues.apache.org/jira/browse/KAFKA-5600), which affects > 0.11.0.0, the version we are running, and that was fixed in 0.11.0.1. But > that seems unlikely as the Flink Kafka connector consumer shouldn't make use > of the offsets committed in Kafka when operating with checkpoints enabled, > nor when the job is not restarting or being restored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)