[jira] [Comment Edited] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923379#comment-16923379
 ] 

Oleg Kuznetsov edited comment on KAFKA-8877 at 9/5/19 12:36 PM:


[~huxi_2b]

Yes, you are right.

 

But looks like the same problem has migrated to *StickyPartitionCache*:

*indexCache* is better to be populated/changed atomically
{code:java}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set 
or that the partition that 
// triggered the new batch matches the sticky partition that needs to be 
changed.
if (oldPart == null || oldPart == prevPartition) {
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
{code}


was (Author: olkuznsmith):
[~huxi_2b]

Yes, you are right.

 

But looks like the same problem has migrated to *StickyPartitionCache*:

*indexCache* is ** better be populated/changed atomically
{code:java}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set 
or that the partition that 
// triggered the new batch matches the sticky partition that needs to be 
changed.
if (oldPart == null || oldPart == prevPartition) {
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
{code}

> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like *topicCounterMap.compute(topic, _ 
> -> ...* (init the counter once per topic))  ** 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923379#comment-16923379
 ] 

Oleg Kuznetsov commented on KAFKA-8877:
---

[~huxi_2b]

Yes, you are right.

 

But looks like the same problem has migrated to *StickyPartitionCache*:

*indexCache* is ** better be populated/changed atomically
{code:java}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set 
or that the partition that 
// triggered the new batch matches the sticky partition that needs to be 
changed.
if (oldPart == null || oldPart == prevPartition) {
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
{code}

> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like *topicCounterMap.compute(topic, _ 
> -> ...* (init the counter once per topic))  ** 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-8877:
--
Description: 
In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
{code}
the counter might be created multiple times instead of once.

I propose to replace it with something like *topicCounterMap.compute(topic, _ 
-> ...* (init the counter once per topic))  ** 

 

  was:
In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
{code}
the counter might be created multiple times instead of once.

I propose to replace it with something like ** *topicCounterMap.compute(topic, 
_ -> ...* (init the counter once per topic*)) ** *

 


> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like *topicCounterMap.compute(topic, _ 
> -> ...* (init the counter once per topic))  ** 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-8877:
--
Description: 
In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
{code}
the counter might be created multiple times instead of once.

I propose to replace it with something like ** *topicCounterMap.compute(topic, 
_ -> ...* (init the counter once per topic*)) ** *

 

  was:
In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
{code}
**the counter might be created multiple times instead of once.

I propose to replace it with something like ** *topicCounterMap.compute(topic, 
_ -> ...* (init the counter once per topic*))* **

 


> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like ** 
> *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*)) 
> ** *
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)
Oleg Kuznetsov created KAFKA-8877:
-

 Summary: Race condition on partition counter
 Key: KAFKA-8877
 URL: https://issues.apache.org/jira/browse/KAFKA-8877
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.2.1
Reporter: Oleg Kuznetsov


In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
{code}
**the counter might be created multiple times instead of once.

I propose to replace it with something like ** *topicCounterMap.compute(topic, 
_ -> ...* (init the counter once per topic*))* **

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7263) Container exception java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null

2019-08-13 Thread Oleg Kuznetsov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16906464#comment-16906464
 ] 

Oleg Kuznetsov commented on KAFKA-7263:
---

[~mjsax] Do you happen to have a workaround for this in Kafka Streams config?

> Container exception java.lang.IllegalStateException: Coordinator selected 
> invalid assignment protocol: null
> ---
>
> Key: KAFKA-7263
> URL: https://issues.apache.org/jira/browse/KAFKA-7263
> Project: Kafka
>  Issue Type: Bug
>Reporter: laomei
>Priority: Major
>
> We are using  spring-kafka and we get an infinite loop error in 
> ConsumerCoordinator.java;
> kafka cluster version: 1.0.0
> kafka-client version: 1.0.0
>  
> 2018-08-08 15:24:46,120 ERROR 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Container exception
>  java.lang.IllegalStateException: Coordinator selected invalid assignment 
> protocol: null
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
>   at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:745)
>  2018-08-08 15:24:46,132 INFO 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Consumer stopped
>  2018-08-08 15:24:46,230 INFO 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Consumer stopped
>  2018-08-08 15:24:46,234 INFO [org.springfram



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-2480) Handle non-CopycatExceptions from SinkTasks

2019-03-18 Thread Oleg Kuznetsov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794970#comment-16794970
 ] 

Oleg Kuznetsov commented on KAFKA-2480:
---

Then a lot of open source connectors misuse this *context.timeout()* to defer 
retry as documentation for it is confusing.

> Handle non-CopycatExceptions from SinkTasks
> ---
>
> Key: KAFKA-2480
> URL: https://issues.apache.org/jira/browse/KAFKA-2480
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Major
> Fix For: 0.9.0.0
>
>
> Currently we catch Throwable in WorkerSinkTask, but we just log the 
> exception. This can lead to data loss because it indicates the messages in 
> the {{put(records)}} call probably were not handled properly. We need to 
> decide what the policy for handling these types of exceptions should be -- 
> try repeating the same records again, risking duplication? or skip them, 
> risking loss? or kill the task immediately and require intervention since 
> it's unclear what happened?
> SourceTasks don't have the same concern -- they can throw other exceptions 
> and as long as we catch them, it is up to the connector to ensure that it 
> does not lose data as a result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2480) Handle non-CopycatExceptions from SinkTasks

2019-03-17 Thread Oleg Kuznetsov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794616#comment-16794616
 ] 

Oleg Kuznetsov commented on KAFKA-2480:
---

[~ewencp]

I was talking about sink connector where put() method is used.

1) what was this Jira intention to add timeout() as an interface method?

2) aleeping as long as I need is good, but it is an antipatter, if the same can 
be easily delegated to the framework.

> Handle non-CopycatExceptions from SinkTasks
> ---
>
> Key: KAFKA-2480
> URL: https://issues.apache.org/jira/browse/KAFKA-2480
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Major
> Fix For: 0.9.0.0
>
>
> Currently we catch Throwable in WorkerSinkTask, but we just log the 
> exception. This can lead to data loss because it indicates the messages in 
> the {{put(records)}} call probably were not handled properly. We need to 
> decide what the policy for handling these types of exceptions should be -- 
> try repeating the same records again, risking duplication? or skip them, 
> risking loss? or kill the task immediately and require intervention since 
> it's unclear what happened?
> SourceTasks don't have the same concern -- they can throw other exceptions 
> and as long as we catch them, it is up to the connector to ensure that it 
> does not lose data as a result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2480) Handle non-CopycatExceptions from SinkTasks

2019-03-17 Thread Oleg Kuznetsov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794524#comment-16794524
 ] 

Oleg Kuznetsov commented on KAFKA-2480:
---

[~gwenshap] [~ewencp]
Looks like the way it was implemented does not guarantee actual waiting will 
happen.

 

The code:
{code:java}
//timeout The time, in milliseconds, spent waiting in poll if data is not 
available in the buffer.
//* If 0, returns immediately with any records that are available currently in 
the buffer, else returns empty.
//* Must not be negative.

consumer.poll(timeoutMs)
{code}
does not have to wait *timeout* ms to return, if there are records in the topic 
available for consumption.

 

Now client code cannot rely on this, for example, trying to meet SLA accessing 
an external storage.

I propose to treat it as business-logic waiting request, where client code 
expects at least *timeoutMs* to wait before return.

> Handle non-CopycatExceptions from SinkTasks
> ---
>
> Key: KAFKA-2480
> URL: https://issues.apache.org/jira/browse/KAFKA-2480
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Major
> Fix For: 0.9.0.0
>
>
> Currently we catch Throwable in WorkerSinkTask, but we just log the 
> exception. This can lead to data loss because it indicates the messages in 
> the {{put(records)}} call probably were not handled properly. We need to 
> decide what the policy for handling these types of exceptions should be -- 
> try repeating the same records again, risking duplication? or skip them, 
> risking loss? or kill the task immediately and require intervention since 
> it's unclear what happened?
> SourceTasks don't have the same concern -- they can throw other exceptions 
> and as long as we catch them, it is up to the connector to ensure that it 
> does not lose data as a result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7593) Infinite restart loop when failed to store big config for task

2018-11-06 Thread Oleg Kuznetsov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704
 ] 

Oleg Kuznetsov edited comment on KAFKA-7593 at 11/7/18 5:27 AM:


[~rhauch] Sure, the workaround is clear, but the ticket is more about letting 
user heads up about the problem preventively.

For instance, by throwing an exception.

Is it possible to have feedback from the broker that this payload is too big 
and just stop working?


was (Author: olkuznsmith):
[~rhauch] Sure, workaround is clear, but the ticket is more about letting user 
heads up about the problem preventively.

For instance, by throwing exception.

Is it possible to have feedback from broker that this payload is too big and 
just stop working?

> Infinite restart loop when failed to store big config for task
> --
>
> Key: KAFKA-7593
> URL: https://issues.apache.org/jira/browse/KAFKA-7593
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In case when config message for config topic is greater than kafka broker 
> allows to store, source connector starts infinite restart loop without any 
> error indication.
> There could be an exception thrown in this case or a smarter handling of big 
> config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7593) Infinite restart loop when failed to store big config for task

2018-11-06 Thread Oleg Kuznetsov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704
 ] 

Oleg Kuznetsov commented on KAFKA-7593:
---

[~rhauch] Sure, workaround is clear, but the ticket is more about to let user 
head up about the problem preventively.

For instance, by throwing exception.

Is it possible to have feedback from broker that this payload is too big and 
just stop working?

> Infinite restart loop when failed to store big config for task
> --
>
> Key: KAFKA-7593
> URL: https://issues.apache.org/jira/browse/KAFKA-7593
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In case when config message for config topic is greater than kafka broker 
> allows to store, source connector starts infinite restart loop without any 
> error indication.
> There could be an exception thrown in this case or a smarter handling of big 
> config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7593) Infinite restart loop when failed to store big config for task

2018-11-06 Thread Oleg Kuznetsov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704
 ] 

Oleg Kuznetsov edited comment on KAFKA-7593 at 11/7/18 5:23 AM:


[~rhauch] Sure, workaround is clear, but the ticket is more about letting user 
heads up about the problem preventively.

For instance, by throwing exception.

Is it possible to have feedback from broker that this payload is too big and 
just stop working?


was (Author: olkuznsmith):
[~rhauch] Sure, workaround is clear, but the ticket is more about to let user 
head up about the problem preventively.

For instance, by throwing exception.

Is it possible to have feedback from broker that this payload is too big and 
just stop working?

> Infinite restart loop when failed to store big config for task
> --
>
> Key: KAFKA-7593
> URL: https://issues.apache.org/jira/browse/KAFKA-7593
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In case when config message for config topic is greater than kafka broker 
> allows to store, source connector starts infinite restart loop without any 
> error indication.
> There could be an exception thrown in this case or a smarter handling of big 
> config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7593) Infinite restart loop when failed to store big config for task

2018-11-05 Thread Oleg Kuznetsov (JIRA)
Oleg Kuznetsov created KAFKA-7593:
-

 Summary: Infinite restart loop when failed to store big config for 
task
 Key: KAFKA-7593
 URL: https://issues.apache.org/jira/browse/KAFKA-7593
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0
Reporter: Oleg Kuznetsov


In case when config message for config topic is greater than kafka broker 
allows to store, source connector starts infinite restart loop without any 
error indication.

There could be an exception thrown in this case or a smarter handling of big 
config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7404) SourceConnector backpressure

2018-09-11 Thread Oleg Kuznetsov (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-7404:
--
Description: 
Now Source connector can generate lots of events without considering how fast 
broker can consume them. 

Proposal is simple: add optional parameter *max.queue.size* and pause polling 
until current buffer is less this size.

  was:
Now Source connector can generate lots of events without considering how fast 
broker can consume them. 

Proposal is simple: add optional parameter *max.records.to.send* and pause 
polling until current buffer is less this size.


> SourceConnector backpressure
> 
>
> Key: KAFKA-7404
> URL: https://issues.apache.org/jira/browse/KAFKA-7404
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> Now Source connector can generate lots of events without considering how fast 
> broker can consume them. 
> Proposal is simple: add optional parameter *max.queue.size* and pause polling 
> until current buffer is less this size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect

2018-05-12 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473103#comment-16473103
 ] 

Oleg Kuznetsov commented on KAFKA-6891:
---

[~ewencp] could you review please?

> send.buffer.bytes should be allowed to set -1 in KafkaConnect
> -
>
> Key: KAFKA-6891
> URL: https://issues.apache.org/jira/browse/KAFKA-6891
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* 
> constraint in *DistributedConfig*, whereas *-1* should be also allowed to set



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect

2018-05-10 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470066#comment-16470066
 ] 

Oleg Kuznetsov commented on KAFKA-6891:
---

https://github.com/apache/kafka/pull/4995

> send.buffer.bytes should be allowed to set -1 in KafkaConnect
> -
>
> Key: KAFKA-6891
> URL: https://issues.apache.org/jira/browse/KAFKA-6891
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* 
> constraint in *DistributedConfig*, whereas *-1* should be also allowed to set



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect

2018-05-09 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469797#comment-16469797
 ] 

Oleg Kuznetsov commented on KAFKA-6891:
---

[~ewencp] Great, let me do that.

> send.buffer.bytes should be allowed to set -1 in KafkaConnect
> -
>
> Key: KAFKA-6891
> URL: https://issues.apache.org/jira/browse/KAFKA-6891
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* 
> constraint in *DistributedConfig*, whereas *-1* should be also allowed to set



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect

2018-05-09 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469621#comment-16469621
 ] 

Oleg Kuznetsov commented on KAFKA-6891:
---

[~ewencp] Do you agree?

> send.buffer.bytes should be allowed to set -1 in KafkaConnect
> -
>
> Key: KAFKA-6891
> URL: https://issues.apache.org/jira/browse/KAFKA-6891
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* 
> constraint in *DistributedConfig*, whereas *-1* should be also allowed to set



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect

2018-05-09 Thread Oleg Kuznetsov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-6891:
--
Description: *send.buffer.bytes* and *receive.buffer.bytes* are declared 
with *atLeast(0)* constraint in *DistributedConfig*, whereas *-1* should be 
also allowed to set  (was: *send.buffer.bytes* and *receive.buffer.bytes* are 
declared with *atLeast(0)* constraint in *DistributedConfig*, whereas -1 should 
be also allowed to set)

> send.buffer.bytes should be allowed to set -1 in KafkaConnect
> -
>
> Key: KAFKA-6891
> URL: https://issues.apache.org/jira/browse/KAFKA-6891
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* 
> constraint in *DistributedConfig*, whereas *-1* should be also allowed to set



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect

2018-05-09 Thread Oleg Kuznetsov (JIRA)
Oleg Kuznetsov created KAFKA-6891:
-

 Summary: send.buffer.bytes should be allowed to set -1 in 
KafkaConnect
 Key: KAFKA-6891
 URL: https://issues.apache.org/jira/browse/KAFKA-6891
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0
Reporter: Oleg Kuznetsov


*send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* 
constraint in *DistributedConfig*, whereas -1 should be also allowed to set



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6869) Distributed herder synchronization issue

2018-05-05 Thread Oleg Kuznetsov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-6869:
--
Description: 
{code:java}
org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance{code}
field is read/written with and without *synchronized* in multiple places, that 
is incorrect by JMM.

I propose to either add *synchronized* to access it in all places or use RW 
lock varying type of locking for reading and writing.

  was:
{code}org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance{code}
 field is read/write with and without *synchronized* in multiple places, that 
is incorrect by JMM.

 I propose to either adding synchronized to access it in all places or using RW 
lock varying type of  locking for reading and writing.


> Distributed herder synchronization issue
> 
>
> Key: KAFKA-6869
> URL: https://issues.apache.org/jira/browse/KAFKA-6869
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> {code:java}
> org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance{code}
> field is read/written with and without *synchronized* in multiple places, 
> that is incorrect by JMM.
> I propose to either add *synchronized* to access it in all places or use RW 
> lock varying type of locking for reading and writing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6869) Distributed herder synchronization issue

2018-05-05 Thread Oleg Kuznetsov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-6869:
--
Description: 
{code}org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance{code}
 field is read/write with and without *synchronized* in multiple places, that 
is incorrect by JMM.

 I propose to either adding synchronized to access it in all places or using RW 
lock varying type of  locking for reading and writing.

  was:
{code}org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance\{code}
 field is read/write with and without *synchronized* in multiple places, that 
is incorrect by JMM.

 I propose to either adding synchronized to access it in all places or using RW 
lock varying type of  locking for reading and writing.


> Distributed herder synchronization issue
> 
>
> Key: KAFKA-6869
> URL: https://issues.apache.org/jira/browse/KAFKA-6869
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> {code}org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance{code}
>  field is read/write with and without *synchronized* in multiple places, that 
> is incorrect by JMM.
>  I propose to either adding synchronized to access it in all places or using 
> RW lock varying type of  locking for reading and writing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6869) Distributed herder synchronization issue

2018-05-05 Thread Oleg Kuznetsov (JIRA)
Oleg Kuznetsov created KAFKA-6869:
-

 Summary: Distributed herder synchronization issue
 Key: KAFKA-6869
 URL: https://issues.apache.org/jira/browse/KAFKA-6869
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0
Reporter: Oleg Kuznetsov


{code}org.apache.kafka.connect.runtime.distributed.DistributedHerder#needsReconfigRebalance\{code}
 field is read/write with and without *synchronized* in multiple places, that 
is incorrect by JMM.

 I propose to either adding synchronized to access it in all places or using RW 
lock varying type of  locking for reading and writing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2018-01-28 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342510#comment-16342510
 ] 

Oleg Kuznetsov commented on KAFKA-4794:
---

[~rhauch] Is it scheduled on release 1.1.0?

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
> Fix For: 1.1.0
>
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-5866) Let source/sink task to finish their job before exit

2017-09-10 Thread Oleg Kuznetsov (JIRA)
Oleg Kuznetsov created KAFKA-5866:
-

 Summary: Let source/sink task to finish their job before exit
 Key: KAFKA-5866
 URL: https://issues.apache.org/jira/browse/KAFKA-5866
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.10.2.0
Reporter: Oleg Kuznetsov


My case is about reading files. When task stops to rebalance or for other 
reason, I want let it to read file till the end at least.

I found that flag 
{code:java}
WorkerTask#stopping
{code}
 is set to true and only then 
{code:java}
SourceTask.stop()
{code}
 is called. This stopping flag prevents WorkerSourceTask from further ingestion 
(exit from 
{code:java}
while ( !isStopped()))
{code}.

Is it possible to let task to decide to work some more time and possibly 
produce more records from the moment of stop() was called on rebalance?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-08-23 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139481#comment-16139481
 ] 

Oleg Kuznetsov commented on KAFKA-5696:
---

[~Yohan123] Main issue was not resolved - we have now understanding that 
workflow of one connector affects the other, and other connectors might be 
writing data at the moment of restart.

Not sure how it can be resolved at some levels, could you clarify?

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-08-22 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137100#comment-16137100
 ] 

Oleg Kuznetsov edited comment on KAFKA-5696 at 8/22/17 5:56 PM:


[~rhauch] Is it possible to add waiting all the tasks to finish their current 
loop (i.e. let producer to finish writing records, commit their offsets) before 
rebalancing?


was (Author: olkuznsmith):
[~rhauch] Is it possible to add waiting all the task to finish their current 
loop (i.e. let producer to finish writing records, commit their offsets) before 
rebalancing?

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-08-22 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137100#comment-16137100
 ] 

Oleg Kuznetsov commented on KAFKA-5696:
---

[~rhauch] Is it possible to add waiting all the task to finish their current 
loop (i.e. let producer to finish writing records, commit their offsets) before 
rebalancing?

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-08-21 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134997#comment-16134997
 ] 

Oleg Kuznetsov commented on KAFKA-5696:
---

[~rhauch] Is it possible that config update in one connector causes restart of 
all connectors withing same JVM? I see this in logs.

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5756) Synchronization issue on flush

2017-08-20 Thread Oleg Kuznetsov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-5756:
--
Description: 
Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
method, whereas this collection can be accessed from 2 different threads:
- *WorkerSourceTask.execute()*, finally block
- *SourceTaskOffsetCommitter*, from periodic flush task

  was:
Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
method, whereas this collection can be accessed from 2 different threads:
- *WorkerSourceTask.execute()*, finally block
- *SourceTaskOffsetCommitter*, from periodic flushing task


> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5756) Synchronization issue on flush

2017-08-20 Thread Oleg Kuznetsov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-5756:
--
Description: 
Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
method, whereas this collection can be accessed from 2 different threads:
- *WorkerSourceTask.execute()*, finally block
- *SourceTaskOffsetCommitter*, from periodic flushing task

  was:
Access to OffsetStorageWriter#toFlush is not synchronized in doFlush() method, 
whereas this collection can be accessed from 2 different threads:
- WorkerSourceTask.execute(), finally block
- SourceTaskOffsetCommitter, from periodic flushing task


> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flushing task



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5756) Synchronization issue on flush

2017-08-20 Thread Oleg Kuznetsov (JIRA)
Oleg Kuznetsov created KAFKA-5756:
-

 Summary: Synchronization issue on flush
 Key: KAFKA-5756
 URL: https://issues.apache.org/jira/browse/KAFKA-5756
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.10.2.0
Reporter: Oleg Kuznetsov


Access to OffsetStorageWriter#toFlush is not synchronized in doFlush() method, 
whereas this collection can be accessed from 2 different threads:
- WorkerSourceTask.execute(), finally block
- SourceTaskOffsetCommitter, from periodic flushing task



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-08-07 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117301#comment-16117301
 ] 

Oleg Kuznetsov commented on KAFKA-5696:
---

Yes, you are right. 

Is there a chance that exactly-once delivery becomes possible since kafka 
supports this mode now?

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>  Labels: newbie
> Fix For: 0.10.0.2
>
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-08-07 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116962#comment-16116962
 ] 

Oleg Kuznetsov commented on KAFKA-5696:
---

[~rhauch] Thank you for detailed explanation. 

But aren't offsets committed also at the end of every poll() method? 
I found this in WorkerSourceTask.execute() finally block.

{code:java}
 } finally {
// It should still be safe to commit offsets since any exception 
would have
// simply resulted in not getting more records but all the existing 
records should be ok to flush
// and commit offsets. Worst case, task.flush() will also throw an 
exception causing the offset commit
// to fail.
commitOffsets();
}
{code}

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>  Labels: newbie
> Fix For: 0.10.0.2
>
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on reconfiguration

2017-08-04 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16115041#comment-16115041
 ] 

Oleg Kuznetsov commented on KAFKA-5696:
---

Is it true that there is a chance of producing records without offsets 
committed in source connector?

> SourceConnector does not commit offset on reconfiguration
> -
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>  Labels: newbie
> Fix For: 0.10.0.2
>
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5696) SourceConnector does not commit offset on reconfiguration

2017-08-03 Thread Oleg Kuznetsov (JIRA)
Oleg Kuznetsov created KAFKA-5696:
-

 Summary: SourceConnector does not commit offset on reconfiguration
 Key: KAFKA-5696
 URL: https://issues.apache.org/jira/browse/KAFKA-5696
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Oleg Kuznetsov
 Fix For: 0.10.0.2


I'm running SourceConnector, that reads files from storage and put data in 
kafka. I want, in case of reconfiguration, offsets to be flushed. 

Say, a file is completely processed, but source records are not yet committed 
and in case of reconfiguration their offsets might be missing in store.

Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-07-17 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16090034#comment-16090034
 ] 

Oleg Kuznetsov commented on KAFKA-4794:
---

[~fhussonnois] I meant who could give it?

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-07-16 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089196#comment-16089196
 ] 

Oleg Kuznetsov edited comment on KAFKA-4794 at 7/17/17 12:07 AM:
-

[~jasong35] [~ewencp] Do you have any feedback regarding this KIP?


was (Author: olkuznsmith):
[~jasong35] Do you have any feedback regarding this KIP?

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-07-16 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089196#comment-16089196
 ] 

Oleg Kuznetsov commented on KAFKA-4794:
---

[~jasong35] Do you have any feedback regarding this KIP?

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5598) Make OffsetReader accessible in ConnectorContext

2017-07-16 Thread Oleg Kuznetsov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-5598:
--
Description: It is useful to read offset in SourceConnector, but now 
OffsetReader is accessible only in SourceTaskContext.  (was: It is useful to 
read offset in SourceConnector, but now OffsetReader is accessible only in 
SourceTask.)

> Make OffsetReader accessible in ConnectorContext
> 
>
> Key: KAFKA-5598
> URL: https://issues.apache.org/jira/browse/KAFKA-5598
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Oleg Kuznetsov
>
> It is useful to read offset in SourceConnector, but now OffsetReader is 
> accessible only in SourceTaskContext.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5598) Make OffsetReader accessible in ConnectorContext

2017-07-16 Thread Oleg Kuznetsov (JIRA)
Oleg Kuznetsov created KAFKA-5598:
-

 Summary: Make OffsetReader accessible in ConnectorContext
 Key: KAFKA-5598
 URL: https://issues.apache.org/jira/browse/KAFKA-5598
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.11.0.0
Reporter: Oleg Kuznetsov


It is useful to read offset in SourceConnector, but now OffsetReader is 
accessible only in SourceTask.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)