[jira] [Comment Edited] (KAFKA-8877) Race condition on partition counter
[ https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923379#comment-16923379 ] Oleg Kuznetsov edited comment on KAFKA-8877 at 9/5/19 12:36 PM: [~huxi_2b] Yes, you are right. But looks like the same problem has migrated to *StickyPartitionCache*: *indexCache* is better to be populated/changed atomically {code:java} public int nextPartition(String topic, Cluster cluster, int prevPartition) { List partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; // Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed. if (oldPart == null || oldPart == prevPartition) { List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() < 1) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = availablePartitions.get(0).partition(); } else { while (newPart == null || newPart.equals(oldPart)) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // Only change the sticky partition if it is null or prevPartition matches the current sticky partition. if (oldPart == null) { indexCache.putIfAbsent(topic, newPart); } else { indexCache.replace(topic, prevPartition, newPart); } return indexCache.get(topic); } return indexCache.get(topic); } {code} was (Author: olkuznsmith): [~huxi_2b] Yes, you are right. But looks like the same problem has migrated to *StickyPartitionCache*: *indexCache* is ** better be populated/changed atomically {code:java} public int nextPartition(String topic, Cluster cluster, int prevPartition) { List partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; // Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed. if (oldPart == null || oldPart == prevPartition) { List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() < 1) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = availablePartitions.get(0).partition(); } else { while (newPart == null || newPart.equals(oldPart)) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // Only change the sticky partition if it is null or prevPartition matches the current sticky partition. if (oldPart == null) { indexCache.putIfAbsent(topic, newPart); } else { indexCache.replace(topic, prevPartition, newPart); } return indexCache.get(topic); } return indexCache.get(topic); } {code} > Race condition on partition counter > --- > > Key: KAFKA-8877 > URL: https://issues.apache.org/jira/browse/KAFKA-8877 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.2.1 >Reporter: Oleg Kuznetsov >Priority: Major > > In the method: > *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* > {code:java} > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.get(topic); > if (null == counter) { > counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); > AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, > counter); > if (currentCounter != null) { > counter = currentCounter; > } > } > return counter.getAndIncrement(); > } > {code} > the counter might be created multiple times instead of once. > I propose to replace it with something like *topicCounterMap.compute(topic, _ > -> ...* (init the counter once per topic)) ** > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8877) Race condition on partition counter
[ https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923379#comment-16923379 ] Oleg Kuznetsov commented on KAFKA-8877: --- [~huxi_2b] Yes, you are right. But looks like the same problem has migrated to *StickyPartitionCache*: *indexCache* is ** better be populated/changed atomically {code:java} public int nextPartition(String topic, Cluster cluster, int prevPartition) { List partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; // Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed. if (oldPart == null || oldPart == prevPartition) { List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() < 1) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = availablePartitions.get(0).partition(); } else { while (newPart == null || newPart.equals(oldPart)) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // Only change the sticky partition if it is null or prevPartition matches the current sticky partition. if (oldPart == null) { indexCache.putIfAbsent(topic, newPart); } else { indexCache.replace(topic, prevPartition, newPart); } return indexCache.get(topic); } return indexCache.get(topic); } {code} > Race condition on partition counter > --- > > Key: KAFKA-8877 > URL: https://issues.apache.org/jira/browse/KAFKA-8877 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.2.1 >Reporter: Oleg Kuznetsov >Priority: Major > > In the method: > *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* > {code:java} > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.get(topic); > if (null == counter) { > counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); > AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, > counter); > if (currentCounter != null) { > counter = currentCounter; > } > } > return counter.getAndIncrement(); > } > {code} > the counter might be created multiple times instead of once. > I propose to replace it with something like *topicCounterMap.compute(topic, _ > -> ...* (init the counter once per topic)) ** > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8877) Race condition on partition counter
[ https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Kuznetsov updated KAFKA-8877: -- Description: In the method: *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* {code:java} private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } {code} the counter might be created multiple times instead of once. I propose to replace it with something like *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic)) ** was: In the method: *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* {code:java} private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } {code} the counter might be created multiple times instead of once. I propose to replace it with something like ** *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*)) ** * > Race condition on partition counter > --- > > Key: KAFKA-8877 > URL: https://issues.apache.org/jira/browse/KAFKA-8877 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.2.1 >Reporter: Oleg Kuznetsov >Priority: Major > > In the method: > *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* > {code:java} > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.get(topic); > if (null == counter) { > counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); > AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, > counter); > if (currentCounter != null) { > counter = currentCounter; > } > } > return counter.getAndIncrement(); > } > {code} > the counter might be created multiple times instead of once. > I propose to replace it with something like *topicCounterMap.compute(topic, _ > -> ...* (init the counter once per topic)) ** > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8877) Race condition on partition counter
[ https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Kuznetsov updated KAFKA-8877: -- Description: In the method: *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* {code:java} private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } {code} the counter might be created multiple times instead of once. I propose to replace it with something like ** *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*)) ** * was: In the method: *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* {code:java} private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } {code} **the counter might be created multiple times instead of once. I propose to replace it with something like ** *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*))* ** > Race condition on partition counter > --- > > Key: KAFKA-8877 > URL: https://issues.apache.org/jira/browse/KAFKA-8877 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.2.1 >Reporter: Oleg Kuznetsov >Priority: Major > > In the method: > *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* > {code:java} > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.get(topic); > if (null == counter) { > counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); > AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, > counter); > if (currentCounter != null) { > counter = currentCounter; > } > } > return counter.getAndIncrement(); > } > {code} > the counter might be created multiple times instead of once. > I propose to replace it with something like ** > *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*)) > ** * > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8877) Race condition on partition counter
Oleg Kuznetsov created KAFKA-8877: - Summary: Race condition on partition counter Key: KAFKA-8877 URL: https://issues.apache.org/jira/browse/KAFKA-8877 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 2.2.1 Reporter: Oleg Kuznetsov In the method: *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* {code:java} private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } {code} **the counter might be created multiple times instead of once. I propose to replace it with something like ** *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*))* ** -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-7263) Container exception java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
[ https://issues.apache.org/jira/browse/KAFKA-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16906464#comment-16906464 ] Oleg Kuznetsov commented on KAFKA-7263: --- [~mjsax] Do you happen to have a workaround for this in Kafka Streams config? > Container exception java.lang.IllegalStateException: Coordinator selected > invalid assignment protocol: null > --- > > Key: KAFKA-7263 > URL: https://issues.apache.org/jira/browse/KAFKA-7263 > Project: Kafka > Issue Type: Bug >Reporter: laomei >Priority: Major > > We are using spring-kafka and we get an infinite loop error in > ConsumerCoordinator.java; > kafka cluster version: 1.0.0 > kafka-client version: 1.0.0 > > 2018-08-08 15:24:46,120 ERROR > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Container exception > java.lang.IllegalStateException: Coordinator selected invalid assignment > protocol: null > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:745) > 2018-08-08 15:24:46,132 INFO > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Consumer stopped > 2018-08-08 15:24:46,230 INFO > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Consumer stopped > 2018-08-08 15:24:46,234 INFO [org.springfram -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-2480) Handle non-CopycatExceptions from SinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794970#comment-16794970 ] Oleg Kuznetsov commented on KAFKA-2480: --- Then a lot of open source connectors misuse this *context.timeout()* to defer retry as documentation for it is confusing. > Handle non-CopycatExceptions from SinkTasks > --- > > Key: KAFKA-2480 > URL: https://issues.apache.org/jira/browse/KAFKA-2480 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Major > Fix For: 0.9.0.0 > > > Currently we catch Throwable in WorkerSinkTask, but we just log the > exception. This can lead to data loss because it indicates the messages in > the {{put(records)}} call probably were not handled properly. We need to > decide what the policy for handling these types of exceptions should be -- > try repeating the same records again, risking duplication? or skip them, > risking loss? or kill the task immediately and require intervention since > it's unclear what happened? > SourceTasks don't have the same concern -- they can throw other exceptions > and as long as we catch them, it is up to the connector to ensure that it > does not lose data as a result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2480) Handle non-CopycatExceptions from SinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794616#comment-16794616 ] Oleg Kuznetsov commented on KAFKA-2480: --- [~ewencp] I was talking about sink connector where put() method is used. 1) what was this Jira intention to add timeout() as an interface method? 2) aleeping as long as I need is good, but it is an antipatter, if the same can be easily delegated to the framework. > Handle non-CopycatExceptions from SinkTasks > --- > > Key: KAFKA-2480 > URL: https://issues.apache.org/jira/browse/KAFKA-2480 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Major > Fix For: 0.9.0.0 > > > Currently we catch Throwable in WorkerSinkTask, but we just log the > exception. This can lead to data loss because it indicates the messages in > the {{put(records)}} call probably were not handled properly. We need to > decide what the policy for handling these types of exceptions should be -- > try repeating the same records again, risking duplication? or skip them, > risking loss? or kill the task immediately and require intervention since > it's unclear what happened? > SourceTasks don't have the same concern -- they can throw other exceptions > and as long as we catch them, it is up to the connector to ensure that it > does not lose data as a result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2480) Handle non-CopycatExceptions from SinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794524#comment-16794524 ] Oleg Kuznetsov commented on KAFKA-2480: --- [~gwenshap] [~ewencp] Looks like the way it was implemented does not guarantee actual waiting will happen. The code: {code:java} //timeout The time, in milliseconds, spent waiting in poll if data is not available in the buffer. //* If 0, returns immediately with any records that are available currently in the buffer, else returns empty. //* Must not be negative. consumer.poll(timeoutMs) {code} does not have to wait *timeout* ms to return, if there are records in the topic available for consumption. Now client code cannot rely on this, for example, trying to meet SLA accessing an external storage. I propose to treat it as business-logic waiting request, where client code expects at least *timeoutMs* to wait before return. > Handle non-CopycatExceptions from SinkTasks > --- > > Key: KAFKA-2480 > URL: https://issues.apache.org/jira/browse/KAFKA-2480 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava >Priority: Major > Fix For: 0.9.0.0 > > > Currently we catch Throwable in WorkerSinkTask, but we just log the > exception. This can lead to data loss because it indicates the messages in > the {{put(records)}} call probably were not handled properly. We need to > decide what the policy for handling these types of exceptions should be -- > try repeating the same records again, risking duplication? or skip them, > risking loss? or kill the task immediately and require intervention since > it's unclear what happened? > SourceTasks don't have the same concern -- they can throw other exceptions > and as long as we catch them, it is up to the connector to ensure that it > does not lose data as a result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7593) Infinite restart loop when failed to store big config for task
[ https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704 ] Oleg Kuznetsov edited comment on KAFKA-7593 at 11/7/18 5:27 AM: [~rhauch] Sure, the workaround is clear, but the ticket is more about letting user heads up about the problem preventively. For instance, by throwing an exception. Is it possible to have feedback from the broker that this payload is too big and just stop working? was (Author: olkuznsmith): [~rhauch] Sure, workaround is clear, but the ticket is more about letting user heads up about the problem preventively. For instance, by throwing exception. Is it possible to have feedback from broker that this payload is too big and just stop working? > Infinite restart loop when failed to store big config for task > -- > > Key: KAFKA-7593 > URL: https://issues.apache.org/jira/browse/KAFKA-7593 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > In case when config message for config topic is greater than kafka broker > allows to store, source connector starts infinite restart loop without any > error indication. > There could be an exception thrown in this case or a smarter handling of big > config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7593) Infinite restart loop when failed to store big config for task
[ https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704 ] Oleg Kuznetsov commented on KAFKA-7593: --- [~rhauch] Sure, workaround is clear, but the ticket is more about to let user head up about the problem preventively. For instance, by throwing exception. Is it possible to have feedback from broker that this payload is too big and just stop working? > Infinite restart loop when failed to store big config for task > -- > > Key: KAFKA-7593 > URL: https://issues.apache.org/jira/browse/KAFKA-7593 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > In case when config message for config topic is greater than kafka broker > allows to store, source connector starts infinite restart loop without any > error indication. > There could be an exception thrown in this case or a smarter handling of big > config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7593) Infinite restart loop when failed to store big config for task
[ https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704 ] Oleg Kuznetsov edited comment on KAFKA-7593 at 11/7/18 5:23 AM: [~rhauch] Sure, workaround is clear, but the ticket is more about letting user heads up about the problem preventively. For instance, by throwing exception. Is it possible to have feedback from broker that this payload is too big and just stop working? was (Author: olkuznsmith): [~rhauch] Sure, workaround is clear, but the ticket is more about to let user head up about the problem preventively. For instance, by throwing exception. Is it possible to have feedback from broker that this payload is too big and just stop working? > Infinite restart loop when failed to store big config for task > -- > > Key: KAFKA-7593 > URL: https://issues.apache.org/jira/browse/KAFKA-7593 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > In case when config message for config topic is greater than kafka broker > allows to store, source connector starts infinite restart loop without any > error indication. > There could be an exception thrown in this case or a smarter handling of big > config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7593) Infinite restart loop when failed to store big config for task
Oleg Kuznetsov created KAFKA-7593: - Summary: Infinite restart loop when failed to store big config for task Key: KAFKA-7593 URL: https://issues.apache.org/jira/browse/KAFKA-7593 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.1.0 Reporter: Oleg Kuznetsov In case when config message for config topic is greater than kafka broker allows to store, source connector starts infinite restart loop without any error indication. There could be an exception thrown in this case or a smarter handling of big config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7404) SourceConnector backpressure
[ https://issues.apache.org/jira/browse/KAFKA-7404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Kuznetsov updated KAFKA-7404: -- Description: Now Source connector can generate lots of events without considering how fast broker can consume them. Proposal is simple: add optional parameter *max.queue.size* and pause polling until current buffer is less this size. was: Now Source connector can generate lots of events without considering how fast broker can consume them. Proposal is simple: add optional parameter *max.records.to.send* and pause polling until current buffer is less this size. > SourceConnector backpressure > > > Key: KAFKA-7404 > URL: https://issues.apache.org/jira/browse/KAFKA-7404 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > Now Source connector can generate lots of events without considering how fast > broker can consume them. > Proposal is simple: add optional parameter *max.queue.size* and pause polling > until current buffer is less this size. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect
[ https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473103#comment-16473103 ] Oleg Kuznetsov commented on KAFKA-6891: --- [~ewencp] could you review please? > send.buffer.bytes should be allowed to set -1 in KafkaConnect > - > > Key: KAFKA-6891 > URL: https://issues.apache.org/jira/browse/KAFKA-6891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* > constraint in *DistributedConfig*, whereas *-1* should be also allowed to set -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect
[ https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470066#comment-16470066 ] Oleg Kuznetsov commented on KAFKA-6891: --- https://github.com/apache/kafka/pull/4995 > send.buffer.bytes should be allowed to set -1 in KafkaConnect > - > > Key: KAFKA-6891 > URL: https://issues.apache.org/jira/browse/KAFKA-6891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* > constraint in *DistributedConfig*, whereas *-1* should be also allowed to set -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect
[ https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469797#comment-16469797 ] Oleg Kuznetsov commented on KAFKA-6891: --- [~ewencp] Great, let me do that. > send.buffer.bytes should be allowed to set -1 in KafkaConnect > - > > Key: KAFKA-6891 > URL: https://issues.apache.org/jira/browse/KAFKA-6891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* > constraint in *DistributedConfig*, whereas *-1* should be also allowed to set -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect
[ https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469621#comment-16469621 ] Oleg Kuznetsov commented on KAFKA-6891: --- [~ewencp] Do you agree? > send.buffer.bytes should be allowed to set -1 in KafkaConnect > - > > Key: KAFKA-6891 > URL: https://issues.apache.org/jira/browse/KAFKA-6891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* > constraint in *DistributedConfig*, whereas *-1* should be also allowed to set -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect
[ https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Kuznetsov updated KAFKA-6891: -- Description: *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* constraint in *DistributedConfig*, whereas *-1* should be also allowed to set (was: *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* constraint in *DistributedConfig*, whereas -1 should be also allowed to set) > send.buffer.bytes should be allowed to set -1 in KafkaConnect > - > > Key: KAFKA-6891 > URL: https://issues.apache.org/jira/browse/KAFKA-6891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* > constraint in *DistributedConfig*, whereas *-1* should be also allowed to set -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect
Oleg Kuznetsov created KAFKA-6891: - Summary: send.buffer.bytes should be allowed to set -1 in KafkaConnect Key: KAFKA-6891 URL: https://issues.apache.org/jira/browse/KAFKA-6891 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.1.0 Reporter: Oleg Kuznetsov *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* constraint in *DistributedConfig*, whereas -1 should be also allowed to set -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6869) Distributed herder synchronization issue
[ https://issues.apache.org/jira/browse/KAFKA-6869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Kuznetsov updated KAFKA-6869: -- Description: {code:java} org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance{code} field is read/written with and without *synchronized* in multiple places, that is incorrect by JMM. I propose to either add *synchronized* to access it in all places or use RW lock varying type of locking for reading and writing. was: {code}org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance{code} field is read/write with and without *synchronized* in multiple places, that is incorrect by JMM. I propose to either adding synchronized to access it in all places or using RW lock varying type of locking for reading and writing. > Distributed herder synchronization issue > > > Key: KAFKA-6869 > URL: https://issues.apache.org/jira/browse/KAFKA-6869 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > {code:java} > org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance{code} > field is read/written with and without *synchronized* in multiple places, > that is incorrect by JMM. > I propose to either add *synchronized* to access it in all places or use RW > lock varying type of locking for reading and writing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6869) Distributed herder synchronization issue
[ https://issues.apache.org/jira/browse/KAFKA-6869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Kuznetsov updated KAFKA-6869: -- Description: {code}org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance{code} field is read/write with and without *synchronized* in multiple places, that is incorrect by JMM. I propose to either adding synchronized to access it in all places or using RW lock varying type of locking for reading and writing. was: {code}org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance\{code} field is read/write with and without *synchronized* in multiple places, that is incorrect by JMM. I propose to either adding synchronized to access it in all places or using RW lock varying type of locking for reading and writing. > Distributed herder synchronization issue > > > Key: KAFKA-6869 > URL: https://issues.apache.org/jira/browse/KAFKA-6869 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > {code}org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance{code} > field is read/write with and without *synchronized* in multiple places, that > is incorrect by JMM. > I propose to either adding synchronized to access it in all places or using > RW lock varying type of locking for reading and writing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6869) Distributed herder synchronization issue
Oleg Kuznetsov created KAFKA-6869: - Summary: Distributed herder synchronization issue Key: KAFKA-6869 URL: https://issues.apache.org/jira/browse/KAFKA-6869 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.1.0 Reporter: Oleg Kuznetsov {code}org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance\{code} field is read/write with and without *synchronized* in multiple places, that is incorrect by JMM. I propose to either adding synchronized to access it in all places or using RW lock varying type of locking for reading and writing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342510#comment-16342510 ] Oleg Kuznetsov commented on KAFKA-4794: --- [~rhauch] Is it scheduled on release 1.1.0? > Add access to OffsetStorageReader from SourceConnector > -- > > Key: KAFKA-4794 > URL: https://issues.apache.org/jira/browse/KAFKA-4794 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Florian Hussonnois >Priority: Minor > Labels: needs-kip > Fix For: 1.1.0 > > > Currently the offsets storage is only accessible from SourceTask to able to > initialize properly tasks after a restart, a crash or a reconfiguration > request. > To implement more complex connectors that need to track the progression of > each task it would helpful to have access to an OffsetStorageReader instance > from the SourceConnector. > In that way, we could have a background thread that could request a tasks > reconfiguration based on source offsets. > This improvement proposal comes from a customer project that needs to > periodically scan directories on a shared storage for detecting and for > streaming new files into Kafka. > The connector implementation is pretty straightforward. > The connector uses a background thread to periodically scan directories. When > new inputs files are detected a tasks reconfiguration is requested. Then the > connector assigns a file subset to each task. > Each task stores sources offsets for the last sent record. The source offsets > data are: > - the size of file > - the bytes offset > - the bytes size > Tasks become idle when the assigned files are completed (in : > recordBytesOffsets + recordBytesSize = fileBytesSize). > Then, the connector should be able to track offsets for each assigned file. > When all tasks has finished the connector can stop them or assigned new files > by requesting tasks reconfiguration. > Moreover, another advantage of monitoring source offsets from the connector > is detect slow or failed tasks and if necessary to be able to restart all > tasks. > If you think this improvement is OK, I can work a pull request. > Thanks, -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-5866) Let source/sink task to finish their job before exit
Oleg Kuznetsov created KAFKA-5866: - Summary: Let source/sink task to finish their job before exit Key: KAFKA-5866 URL: https://issues.apache.org/jira/browse/KAFKA-5866 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.10.2.0 Reporter: Oleg Kuznetsov My case is about reading files. When task stops to rebalance or for other reason, I want let it to read file till the end at least. I found that flag {code:java} WorkerTask#stopping {code} is set to true and only then {code:java} SourceTask.stop() {code} is called. This stopping flag prevents WorkerSourceTask from further ingestion (exit from {code:java} while ( !isStopped())) {code}. Is it possible to let task to decide to work some more time and possibly produce more records from the moment of stop() was called on rebalance? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139481#comment-16139481 ] Oleg Kuznetsov commented on KAFKA-5696: --- [~Yohan123] Main issue was not resolved - we have now understanding that workflow of one connector affects the other, and other connectors might be writing data at the moment of restart. Not sure how it can be resolved at some levels, could you clarify? > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137100#comment-16137100 ] Oleg Kuznetsov edited comment on KAFKA-5696 at 8/22/17 5:56 PM: [~rhauch] Is it possible to add waiting all the tasks to finish their current loop (i.e. let producer to finish writing records, commit their offsets) before rebalancing? was (Author: olkuznsmith): [~rhauch] Is it possible to add waiting all the task to finish their current loop (i.e. let producer to finish writing records, commit their offsets) before rebalancing? > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137100#comment-16137100 ] Oleg Kuznetsov commented on KAFKA-5696: --- [~rhauch] Is it possible to add waiting all the task to finish their current loop (i.e. let producer to finish writing records, commit their offsets) before rebalancing? > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134997#comment-16134997 ] Oleg Kuznetsov commented on KAFKA-5696: --- [~rhauch] Is it possible that config update in one connector causes restart of all connectors withing same JVM? I see this in logs. > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Kuznetsov updated KAFKA-5756: -- Description: Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* method, whereas this collection can be accessed from 2 different threads: - *WorkerSourceTask.execute()*, finally block - *SourceTaskOffsetCommitter*, from periodic flush task was: Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* method, whereas this collection can be accessed from 2 different threads: - *WorkerSourceTask.execute()*, finally block - *SourceTaskOffsetCommitter*, from periodic flushing task > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Kuznetsov updated KAFKA-5756: -- Description: Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* method, whereas this collection can be accessed from 2 different threads: - *WorkerSourceTask.execute()*, finally block - *SourceTaskOffsetCommitter*, from periodic flushing task was: Access to OffsetStorageWriter#toFlush is not synchronized in doFlush() method, whereas this collection can be accessed from 2 different threads: - WorkerSourceTask.execute(), finally block - SourceTaskOffsetCommitter, from periodic flushing task > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flushing task -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5756) Synchronization issue on flush
Oleg Kuznetsov created KAFKA-5756: - Summary: Synchronization issue on flush Key: KAFKA-5756 URL: https://issues.apache.org/jira/browse/KAFKA-5756 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.10.2.0 Reporter: Oleg Kuznetsov Access to OffsetStorageWriter#toFlush is not synchronized in doFlush() method, whereas this collection can be accessed from 2 different threads: - WorkerSourceTask.execute(), finally block - SourceTaskOffsetCommitter, from periodic flushing task -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117301#comment-16117301 ] Oleg Kuznetsov commented on KAFKA-5696: --- Yes, you are right. Is there a chance that exactly-once delivery becomes possible since kafka supports this mode now? > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov > Labels: newbie > Fix For: 0.10.0.2 > > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116962#comment-16116962 ] Oleg Kuznetsov commented on KAFKA-5696: --- [~rhauch] Thank you for detailed explanation. But aren't offsets committed also at the end of every poll() method? I found this in WorkerSourceTask.execute() finally block. {code:java} } finally { // It should still be safe to commit offsets since any exception would have // simply resulted in not getting more records but all the existing records should be ok to flush // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit // to fail. commitOffsets(); } {code} > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov > Labels: newbie > Fix For: 0.10.0.2 > > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on reconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16115041#comment-16115041 ] Oleg Kuznetsov commented on KAFKA-5696: --- Is it true that there is a chance of producing records without offsets committed in source connector? > SourceConnector does not commit offset on reconfiguration > - > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov > Labels: newbie > Fix For: 0.10.0.2 > > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5696) SourceConnector does not commit offset on reconfiguration
Oleg Kuznetsov created KAFKA-5696: - Summary: SourceConnector does not commit offset on reconfiguration Key: KAFKA-5696 URL: https://issues.apache.org/jira/browse/KAFKA-5696 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Oleg Kuznetsov Fix For: 0.10.0.2 I'm running SourceConnector, that reads files from storage and put data in kafka. I want, in case of reconfiguration, offsets to be flushed. Say, a file is completely processed, but source records are not yet committed and in case of reconfiguration their offsets might be missing in store. Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16090034#comment-16090034 ] Oleg Kuznetsov commented on KAFKA-4794: --- [~fhussonnois] I meant who could give it? > Add access to OffsetStorageReader from SourceConnector > -- > > Key: KAFKA-4794 > URL: https://issues.apache.org/jira/browse/KAFKA-4794 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Florian Hussonnois >Priority: Minor > Labels: needs-kip > > Currently the offsets storage is only accessible from SourceTask to able to > initialize properly tasks after a restart, a crash or a reconfiguration > request. > To implement more complex connectors that need to track the progression of > each task it would helpful to have access to an OffsetStorageReader instance > from the SourceConnector. > In that way, we could have a background thread that could request a tasks > reconfiguration based on source offsets. > This improvement proposal comes from a customer project that needs to > periodically scan directories on a shared storage for detecting and for > streaming new files into Kafka. > The connector implementation is pretty straightforward. > The connector uses a background thread to periodically scan directories. When > new inputs files are detected a tasks reconfiguration is requested. Then the > connector assigns a file subset to each task. > Each task stores sources offsets for the last sent record. The source offsets > data are: > - the size of file > - the bytes offset > - the bytes size > Tasks become idle when the assigned files are completed (in : > recordBytesOffsets + recordBytesSize = fileBytesSize). > Then, the connector should be able to track offsets for each assigned file. > When all tasks has finished the connector can stop them or assigned new files > by requesting tasks reconfiguration. > Moreover, another advantage of monitoring source offsets from the connector > is detect slow or failed tasks and if necessary to be able to restart all > tasks. > If you think this improvement is OK, I can work a pull request. > Thanks, -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089196#comment-16089196 ] Oleg Kuznetsov edited comment on KAFKA-4794 at 7/17/17 12:07 AM: - [~jasong35] [~ewencp] Do you have any feedback regarding this KIP? was (Author: olkuznsmith): [~jasong35] Do you have any feedback regarding this KIP? > Add access to OffsetStorageReader from SourceConnector > -- > > Key: KAFKA-4794 > URL: https://issues.apache.org/jira/browse/KAFKA-4794 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Florian Hussonnois >Priority: Minor > Labels: needs-kip > > Currently the offsets storage is only accessible from SourceTask to able to > initialize properly tasks after a restart, a crash or a reconfiguration > request. > To implement more complex connectors that need to track the progression of > each task it would helpful to have access to an OffsetStorageReader instance > from the SourceConnector. > In that way, we could have a background thread that could request a tasks > reconfiguration based on source offsets. > This improvement proposal comes from a customer project that needs to > periodically scan directories on a shared storage for detecting and for > streaming new files into Kafka. > The connector implementation is pretty straightforward. > The connector uses a background thread to periodically scan directories. When > new inputs files are detected a tasks reconfiguration is requested. Then the > connector assigns a file subset to each task. > Each task stores sources offsets for the last sent record. The source offsets > data are: > - the size of file > - the bytes offset > - the bytes size > Tasks become idle when the assigned files are completed (in : > recordBytesOffsets + recordBytesSize = fileBytesSize). > Then, the connector should be able to track offsets for each assigned file. > When all tasks has finished the connector can stop them or assigned new files > by requesting tasks reconfiguration. > Moreover, another advantage of monitoring source offsets from the connector > is detect slow or failed tasks and if necessary to be able to restart all > tasks. > If you think this improvement is OK, I can work a pull request. > Thanks, -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089196#comment-16089196 ] Oleg Kuznetsov commented on KAFKA-4794: --- [~jasong35] Do you have any feedback regarding this KIP? > Add access to OffsetStorageReader from SourceConnector > -- > > Key: KAFKA-4794 > URL: https://issues.apache.org/jira/browse/KAFKA-4794 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Florian Hussonnois >Priority: Minor > Labels: needs-kip > > Currently the offsets storage is only accessible from SourceTask to able to > initialize properly tasks after a restart, a crash or a reconfiguration > request. > To implement more complex connectors that need to track the progression of > each task it would helpful to have access to an OffsetStorageReader instance > from the SourceConnector. > In that way, we could have a background thread that could request a tasks > reconfiguration based on source offsets. > This improvement proposal comes from a customer project that needs to > periodically scan directories on a shared storage for detecting and for > streaming new files into Kafka. > The connector implementation is pretty straightforward. > The connector uses a background thread to periodically scan directories. When > new inputs files are detected a tasks reconfiguration is requested. Then the > connector assigns a file subset to each task. > Each task stores sources offsets for the last sent record. The source offsets > data are: > - the size of file > - the bytes offset > - the bytes size > Tasks become idle when the assigned files are completed (in : > recordBytesOffsets + recordBytesSize = fileBytesSize). > Then, the connector should be able to track offsets for each assigned file. > When all tasks has finished the connector can stop them or assigned new files > by requesting tasks reconfiguration. > Moreover, another advantage of monitoring source offsets from the connector > is detect slow or failed tasks and if necessary to be able to restart all > tasks. > If you think this improvement is OK, I can work a pull request. > Thanks, -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5598) Make OffsetReader accessible in ConnectorContext
[ https://issues.apache.org/jira/browse/KAFKA-5598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Kuznetsov updated KAFKA-5598: -- Description: It is useful to read offset in SourceConnector, but now OffsetReader is accessible only in SourceTaskContext. (was: It is useful to read offset in SourceConnector, but now OffsetReader is accessible only in SourceTask.) > Make OffsetReader accessible in ConnectorContext > > > Key: KAFKA-5598 > URL: https://issues.apache.org/jira/browse/KAFKA-5598 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Oleg Kuznetsov > > It is useful to read offset in SourceConnector, but now OffsetReader is > accessible only in SourceTaskContext. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5598) Make OffsetReader accessible in ConnectorContext
Oleg Kuznetsov created KAFKA-5598: - Summary: Make OffsetReader accessible in ConnectorContext Key: KAFKA-5598 URL: https://issues.apache.org/jira/browse/KAFKA-5598 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 0.11.0.0 Reporter: Oleg Kuznetsov It is useful to read offset in SourceConnector, but now OffsetReader is accessible only in SourceTask. -- This message was sent by Atlassian JIRA (v6.4.14#64029)