[ 
https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated FLINK-9731:
------------------------------
    Description: 
On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job 
instance began consuming records from the earliest offsets available in Kafka 
for the partitions assigned to it. Other subtasks did not exhibit this behavior 
and continued operating normally.

Previous to the event the job exhibited no Kafka lag. The job showed no failed 
checkpoints and the job did not restore or restart. Flink logs only shoed the 
following message:
{noformat}
June 30th 2018, 02:35:01.711    Fetch offset 2340400514 is out of range for 
partition topic-124, resetting offset
{noformat}

The job is configured with checkpoints at 1 minute intervals. The Kafka 
connector consumer is configured to start from group offsets if it is not 
started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
consumer is configured to fallback to the earliest offsets is no group offsets 
are committed by setting `auto.offset.reset` to `earliest` in the Kafka 
consumer config.

Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of 
its partitions for around 30 seconds as a result of losing its connection to 
ZooKeeper.

 
{noformat}
[2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
partition [cloud_ioc_events,32] to broker 
1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
{noformat}

The broker immediately reconnected to after a few tries ZK:

{noformat}
[2018-06-30 09:34:55,462] INFO Opening socket connection to server 
10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,463] INFO Socket connection established to 
10.210.48.187/10.210.48.187:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,465] INFO Initiating client connection, 
connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
(org.apache.zookeeper.ZooKeeper)
[2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,468] INFO Opening socket connection to server 
10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,468] INFO Socket connection established to 
10.210.43.200/10.210.43.200:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO Session establishment complete on server 
10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 
with addresses: 
EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT)
 (kafka.utils.ZkUtils)
[2018-06-30 09:34:55,476] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
{noformat}

By 9:35:02 partitions had returned to the broker.

It appears this it the broker that the subtask was consuming from, as outgoing 
network traffic from it spiked after the broker recovered leadership of its 
partitions, which is consistent with the subtask starting to consuming from the 
earliest offset.

This may have been related to this [Kafka issue 
5600](https://issues.apache.org/jira/browse/KAFKA-5600), which affects 
0.11.0.0, the version we are running, and that was fixed in 0.11.0.1. But that 
seems unlikely as the Flink Kafka connector consumer shouldn't make use of the 
offsets committed in Kafka when operating with checkpoints enabled, nor when 
the job is not restarting or being restored.



  was:
On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job 
instance began consuming records from the earliest offsets available in Kafka 
for the partitions assigned to it. Other subtasks did not exhibit this behavior 
and continued operating normally.

Previous to the event the job exhibited no Kafka lag. The job showed no failed 
checkpoints and the job did not restore or restart. Flink logs show no 
indication of anything amiss. There were no errors in the or Kafka related 
messages in the Flink logs.

The job is configured with checkpoints at 1 minute intervals. The Kafka 
connector consumer is configured to start from group offsets if it is not 
started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
consumer is configured to fallback to the earliest offsets is no group offsets 
are committed by setting `auto.offset.reset` to `earliest` in the Kafka 
consumer config.

Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of 
its partitions for around 30 seconds as a result of losing its connection to 
ZooKeeper.

 
{noformat}
[2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
partition [cloud_ioc_events,32] to broker 
1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
{noformat}

The broker immediately reconnected to after a few tries ZK:

{noformat}
[2018-06-30 09:34:55,462] INFO Opening socket connection to server 
10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,463] INFO Socket connection established to 
10.210.48.187/10.210.48.187:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,465] INFO Initiating client connection, 
connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
(org.apache.zookeeper.ZooKeeper)
[2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,468] INFO Opening socket connection to server 
10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,468] INFO Socket connection established to 
10.210.43.200/10.210.43.200:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO Session establishment complete on server 
10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 
with addresses: 
EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT)
 (kafka.utils.ZkUtils)
[2018-06-30 09:34:55,476] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
{noformat}

By 9:35:02 partitions had returned to the broker.

It appears this it the broker that the subtask was consuming from, as outgoing 
network traffic from it spiked after the broker recovered leadership of its 
partitions, which is consistent with the subtask starting to consuming from the 
earliest offset.

This may have been related to this [Kafka issue 
5600](https://issues.apache.org/jira/browse/KAFKA-5600), which affects 
0.11.0.0, the version we are running, and that was fixed in 0.11.0.1. But that 
seems unlikely as the Flink Kafka connector consumer shouldn't make use of the 
offsets committed in Kafka when operating with checkpoints enabled, nor when 
the job is not restarting or being restored.




> Kafka source subtask begins to consume from earliest offset
> -----------------------------------------------------------
>
>                 Key: FLINK-9731
>                 URL: https://issues.apache.org/jira/browse/FLINK-9731
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.2
>            Reporter: Elias Levy
>            Priority: Critical
>
> On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink 
> job instance began consuming records from the earliest offsets available in 
> Kafka for the partitions assigned to it. Other subtasks did not exhibit this 
> behavior and continued operating normally.
> Previous to the event the job exhibited no Kafka lag. The job showed no 
> failed checkpoints and the job did not restore or restart. Flink logs only 
> shoed the following message:
> {noformat}
> June 30th 2018, 02:35:01.711  Fetch offset 2340400514 is out of range for 
> partition topic-124, resetting offset
> {noformat}
> The job is configured with checkpoints at 1 minute intervals. The Kafka 
> connector consumer is configured to start from group offsets if it is not 
> started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
> consumer is configured to fallback to the earliest offsets is no group 
> offsets are committed by setting `auto.offset.reset` to `earliest` in the 
> Kafka consumer config.
> Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership 
> of its partitions for around 30 seconds as a result of losing its connection 
> to ZooKeeper.
>  
> {noformat}
> [2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
> sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
> partition [cloud_ioc_events,32] to broker 
> 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
> server is not the leader for that topic-partition. 
> (kafka.server.ReplicaFetcherThread)
> {noformat}
> The broker immediately reconnected to after a few tries ZK:
> {noformat}
> [2018-06-30 09:34:55,462] INFO Opening socket connection to server 
> 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,463] INFO Socket connection established to 
> 10.210.48.187/10.210.48.187:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
> session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,465] INFO Initiating client connection, 
> connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
> sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
> (org.apache.zookeeper.ZooKeeper)
> [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
> session 0x161305b7bd81a09 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
> 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,468] INFO Opening socket connection to server 
> 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,468] INFO Socket connection established to 
> 10.210.43.200/10.210.43.200:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,471] INFO Session establishment complete on server 
> 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 
> 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? 
> false) (kafka.utils.ZKCheckedEphemeral)
> [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path 
> /brokers/ids/2005 with addresses: 
> EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT)
>  (kafka.utils.ZkUtils)
> [2018-06-30 09:34:55,476] INFO done re-registering broker 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch 
> for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
> {noformat}
> By 9:35:02 partitions had returned to the broker.
> It appears this it the broker that the subtask was consuming from, as 
> outgoing network traffic from it spiked after the broker recovered leadership 
> of its partitions, which is consistent with the subtask starting to consuming 
> from the earliest offset.
> This may have been related to this [Kafka issue 
> 5600](https://issues.apache.org/jira/browse/KAFKA-5600), which affects 
> 0.11.0.0, the version we are running, and that was fixed in 0.11.0.1. But 
> that seems unlikely as the Flink Kafka connector consumer shouldn't make use 
> of the offsets committed in Kafka when operating with checkpoints enabled, 
> nor when the job is not restarting or being restored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to