[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2018-05-03 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462890#comment-16462890
 ] 

Randall Hauch commented on KAFKA-5716:
--

[~steff1193], thanks for all the work on this, and sorry about the delay.

First of all, if we're going to change the public API we need to have a KIP. 
Second, it would be preferable to have a KIP and a PR that were much more 
minimalistic that contain only the essential changes we have to make to enable 
the correct behavior. From the previous conversations, this might include:

# Adding a {{committed(Map)}} method that 
defaults to calling {{commit()}}, and that clearly describes it as being called 
with the latest offsets for records that were both written and whose offsets 
were committed.
# Deprecating the {{commit()}} method, and briefly describing why in the 
JavaDoc and that `committed(Map)` is the preferred approach.
# Change the {{WorkerSinkTask}} to call {{committed(Map)}} instead of 
{{commit()}}, and this will require tracking offsets a bit more carefully. 
(This is where this PR will get complicated, but it doesn't have to be in the 
KIP.)

The important thing is that we don't want to break backward compatibility, but 
we should provide a path forward for source connectors that need to rely upon 
this. WDYT?


> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Assignee: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-12-29 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16306217#comment-16306217
 ] 

Per Steffensen commented on KAFKA-5716:
---

No one? Really?

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Assignee: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-11-08 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243639#comment-16243639
 ] 

Per Steffensen commented on KAFKA-5716:
---

Please comment!

Is the PR OK, or do you want me to do more? Anyone willing to help get this 
committed? Does it need a KIP?

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Assignee: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-10-21 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16214047#comment-16214047
 ] 

Per Steffensen commented on KAFKA-5716:
---

Ahhh, realized that I was doing more that this ticket is about, and that this 
part grew. I was trying to make naming consistent with respect to calling it 
"committing offsets" or "flushing offsets". I tried to do that for the 
sink-connector-side also. But that involves a lot of changes - including public 
ones. It is a shame that naming seems to be inconsistent across the code, 
though, but that should be corrected in another ticket. I do not care much 
whether it is called "committing offsets" or "flushing offsets", but it should 
be consistent.

But I do care about having "commit" and "commitRecord" on SourceTask, because 
it sounds like it tells you what to do, and it is not necessarily something 
that rimes on commit you want to do, when offsets have been committed/flushed 
and when records have been sent and acknowledged, respectively. Let the methods 
on SourceTask reflect what has just happened and let the SourceTask decide its 
action, instead of trying to tell the SourceTask what it should do.

With all the semi-unreleated naming-changes reverted, the code-changes in the 
PR is comprehensible again.

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Assignee: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-10-20 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212416#comment-16212416
 ] 

Per Steffensen commented on KAFKA-5716:
---

Having a hard time to figure out exactly what you like me to do and not. 
Therefore I just did changes that I find reasonable. Please comment!

I was fairly eager about doing related stuff, like deprecating, renaming 
related methods, being consistent on related stuff etc. Please let me know if I 
was too eager.

Only "public" changes, as I see it is
* SourceTask: offsetsFlushedAndAcknowledged and recordSentAndAcknowledged 
added, but commit and commitRecord still exists with same out-of-the-box 
semantics
* WorkerConfig: OFFSET_COMMIT_XXX constants renamed to OFFSET_FLUSH_XXX

Probably ought to write more here, but I am in a hurry right now.

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Assignee: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-24 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178406#comment-16178406
 ] 

Ewen Cheslack-Postava commented on KAFKA-5716:
--

Sorry, I was just doing a round of cleanup on the JIRAs to help track 
in-progress JIRAs. Submitting PRs doesn't automatically mark these as in 
progress so I was just updating status + assignee for all Connect JIRAs based 
on what had been submitted so far as it helps us find JIRAs that could use 
review, especially as releases are nearing.

I think in retrospect the naming of commitRecord is probably unfortunate. 
commit() would tell you when the data is flushed to Kafka + offsets committed, 
whereas commitRecord really only guarantees it was written to Kafka, but then 
you might restart the task and read committed offsets that are earlier than 
that. Probably flushedRecord or ackedRecord for what commitRecord currently 
does + commitRecord behavior that would have the framework save the list of 
records and then invoke callbacks when the offset commit succeeded would be 
better.

The motivating use case for the commitRecord was really systems that only have 
individual message ack anyway, e.g. message queues. In this case, any sort of 
bulk ack from the connect framework, i.e. commit(), isn't all that helpful and 
in fact requires more state tracking on the part of the connector. Because of 
the current behavior, it also allows you to avoid duplicates in the periods 
between offset commits since you can selectively destroy data that you know has 
made it to Kafka even if the offsets haven't been committed yet. Even if we 
just removed commit(), whether you'd want something other than commitRecord() 
would depend on how fine-grained you are ok with acking data and whether you 
need a collective/bulk ack.

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Assignee: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-23 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177881#comment-16177881
 ] 

Randall Hauch commented on KAFKA-5716:
--

I agree that the current {{commit()}} method is incorrect and should be dealt 
with. However, my point was that you can do a lot with just {{commitRecord}} 
while ignoring the broken {{commit()}} method.

We do need to fix this, and I think [~ewencp] assigned this to you since you 
were already working on a fix, tho obviously looking for guidance as to the 
correct route. I see [~ewencp]'s point, too, that *correcting the behavior* of 
the {{commit()}} method will not be trivial or inexpensive, and why he 
considers removing the method altogether (initially via deprecation) an 
attractive resolution. As I mention above, the most important thing is that 
{{commitRecord}} still tells you when a record has been written to Kafka. 
Flushed offsets are almost certainly going to be behind what's written to 
Kafka, which is why I think source connectors shouldn't really have to deal 
with or be concerned with what has been flushed.

WDYT?

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Assignee: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-23 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177692#comment-16177692
 ] 

Per Steffensen commented on KAFKA-5716:
---

[~ewencp] assigned me to this task. Does that mean, that I should add my 
fix-suggestion to the PR, and let us discuss from there?

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Assignee: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-23 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177688#comment-16177688
 ] 

Per Steffensen commented on KAFKA-5716:
---

bq. Yeah, this makes sense. The existing SourceTask.commitRecord(...) method is 
called after each source record has been written to Kafka – can you use this to 
keep track of the offsets that have been written?

Yeah, that is what I am doing today, because it will calculate something that 
is closer to the truth, than just assuming that everything polled has been 
written. But it is not necessarily true either. It can happen that (some of) 
the records that have been polled, but not included in the offsets-write/flush 
(which I have demonstrated can happen), has also been sent to outgoing Kafka 
AND received acknowledge, leading to the call to 
{{SourceTask.commitRecord(...)}}. So at the time of {{SourceTask.commit()}} I 
claim that it is possible that {{SourceTask.commitRecord(...)}} has been called 
for some records not included in the offsets-write/flush.

You suggest several things that I could do, and some of it I am doing. I 
believe I know very well, how Kafka and Kafka-Connect works in this area. I can 
do a lot of things to "workaround" that fact that I do not really know which 
offsets has been written/flushed. The thing about KAFKA-5716 is that I really 
ought to be told - why not? Or at least the JavaDoc should not tell me that I 
am told, when the truth is that I am not :-)

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Assignee: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-22 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176582#comment-16176582
 ] 

Randall Hauch commented on KAFKA-5716:
--

{quote}
What I am using it for can in some cases be described as "transfer data from 
some temporary storage (source-system) to Kafka, and when it has been safely 
transferred it is deleted from the source-system
{quote}

Yeah, this makes sense. The existing {{SourceTask.commitRecord(...)}} method is 
called after each source record has been written to Kafka -- can you use this 
to keep track of the offsets that have been written? Yeah, you have to do more 
bookkeeping, and you'd likely want to accumulate a bunch of offsets, but it may 
also be possible that if you know ahead of time the last offset in a particular 
file that you can react to those "last offsets" by removing the file. Note that 
this allows you to detect that the records *have been written to Kafka*.

It is possible with this approach that if Connect were to crash before the most 
recent offsets have been flushed/committed, then upon restart Connect might 
want to start with offsets that are too old. But your connector would know that 
if the file(s) described by the last persisted offsets were removed, they would 
have been cleaned up once all of their records were written to Kafka. Could the 
connector just skip those files?

{quote}
and in other cases as just "advanced Kafka-to-Kafka ETL
{quote}

I wonder if you could do something similar for this case by relying upon 
{{commitRecord(...)}} to know when particular records have been written to 
Kafka. If so, I think the difference in behavior is whether you believe the 
offsets for *written records* or the *last committed offsets* are the "most 
correct" correct information. Recall that the last committed offsets may not 
represent the most recently *written* records, whereas the offsets from 
{{commitRecord(...)}} are definitely the offsets for the most recently-written 
records.

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-20 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172876#comment-16172876
 ] 

Per Steffensen commented on KAFKA-5716:
---

bq. The idiomatic pattern for source connectors is to rely upon the source 
partition and source offsets included in each source record, and to have 
Connect periodically commit those offsets to offset storage (e.g., Kafka in 
distributed mode) as well as during graceful shutdown of the connector. Connect 
then uses the committed offsets upon restart to inform the connector of the 
last persisted offsets the connector provided, and then the connector can 
restart from that point.

Yes, I completely get that.

bq. In this scenario, the connector doesn't need to acknowledge or be notified 
of the committed offsets since Connect is completely managing the offsets for 
the connector.

Guess it depends

bq. I'd be interested in hearing more about your use case, where IIUC your 
connector needs to notify the source system that progress has completed. Can 
you describe the source system and why you're using this approach?

I believe it would be to much explanation here. I can give a few "hints", but 
maybe you will not be able to follow without more details.

First of all, I am starting to realize that I may be using Kafka-Connect for 
something that it was not completely intended for. I start to believe that it 
is intended to "expose data in some source-system via Kafka, but the data stays 
in the source-system". What I am using it for can in some cases be described as 
"transfer data from some temporary storage (source-system) to Kafka, and when 
it has been safely transferred it is deleted from the source-system", and in 
other cases as just "advanced Kafka-to-Kafka ETL". Even though Kafka-Connect 
(source-connectors) is intended for the first thing, it does a very nice job 
for me on the second thing :-) Maybe the intended use for Kafka-Connect could 
change to include both :-) :-) :-)

Shortly about two of the source-connectors I have

1. TCP streaming indirectly to Kafka

>From a 3rd-party we receive data through TCP. Unknown number of concurrent 
>TCP-streams. We have only one chance to get this data, so we better make sure 
>we "receive and store it". We could write the TCP-streams directly to Kafka, 
>but for several good reasons we do write the stream to local files. Each 
>stream gets chopped into several files of a particular length, starting on the 
>next file, when the previous file has reached that length. Now, the job is to 
>get each of those streams onto Kafka. Using Kafka-Connect, which does the job 
>nicely, and bla bla bla I cannot afford to delete the original file before all 
>the data in it has been written to target Kafka, and the connect-offsets (used 
>to keep track of where in the files I got to) has been flushed, so that I will 
>not be asked to replay from a none-existing file. Lots of details missing, but 
>maybe you get the picture.
As a note, there may be significantly more TCP-streams than I have partitions 
in my target Kafka-topic, so the streams are "multiplexed" into the partitions, 
several streams sharing the same partition. Each stream has its own unique key 
(used as Kafka-message-key), so that one stream can be distinguished from the 
other, and so that all messages from the same stream goes to the same partition 
(using default key-hash-modulo partitions-selector)

2. Parsing packets from streams multiplexed into the partitions of a Kafka topic

So we have multiplexed streams in the partitions of a Kafka topic. From here I 
use Kafka-Connect as an advanced ETL (including the translate) on 
Kafka-to-Kafka. No, Kafka-Streams will not do the job. But something based on 
Kafka-Connect will :-) For this Kafka-Connect-ETL it is not enough to only use 
normal Kafka offset on the ingoing Kafka topic, because it has to keep track of 
where each multiplexed sub-stream of each partition got to. It does so relative 
to the normal committed offset on the ingoing Kafka topic. I order for this to 
work properly the order of things has to be
* Outgoing packets must have been sent and acknowledged by outgoing Kafka 
(commitRecord)
* Sub-stream offsets (connect-offsets) relative to the normal Kafka offset has 
to be safely stored (commit)
* Normal Kafka offset can be moved

Losts of details missing, but you will see that I have to know when 
connect-offstes have been written and flushed, in order to know when I can 
acknowledge the normal Kafka-offset, which they are relative to.

I consider "multiplexing more data-streams into less Kafka-partitions" as a 
"problem" so general, that I hope to get the time to open-source the system. I 
believe others will be able to benefit.

bq. Does the source not allow you to replay from any point? Are you using this 
mechanism to notify the source system that it can garbage collect events that 
your connector is 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-19 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172057#comment-16172057
 ] 

Randall Hauch commented on KAFKA-5716:
--

{quote}
I am! And couldn't live without it  I cannot afford to acknowledge data going 
into my source-connector before the corresponding outgoing records AND their 
offset-changes has been written, flushed and acknowledged. Today I pretend that 
everything polled has been written to offset-storage when task.commit() is 
called, even though that is not always entirely true, but it is close. But not 
knowing anything about when offsets have been written to offset-storage would 
definitely leave me in the blind.
{/quote}

The idiomatic pattern for source connectors is to rely upon the source 
partition and source offsets included in each source record, and to have 
Connect periodically commit those offsets to offset storage (e.g., Kafka in 
distributed mode) as well as during graceful shutdown of the connector. Connect 
then uses the committed offsets upon restart to inform the connector of the 
last persisted offsets the connector provided, and then the connector can 
restart from that point. In this scenario, the connector doesn't need to 
acknowledge or be notified of the committed offsets since Connect is completely 
managing the offsets for the connector.

I'd be interested in hearing more about your use case, where IIUC your 
connector needs to notify the source system that progress has completed. Can 
you describe the source system and why you're using this approach? Does the 
source not allow you to replay from any point? Are you using this mechanism to 
notify the source system that it can garbage collect events that your connector 
is reading?

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-19 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172056#comment-16172056
 ] 

Randall Hauch commented on KAFKA-5716:
--

{quote}
I am! And couldn't live without it  I cannot afford to acknowledge data going 
into my source-connector before the corresponding outgoing records AND their 
offset-changes has been written, flushed and acknowledged. Today I pretend that 
everything polled has been written to offset-storage when task.commit() is 
called, even though that is not always entirely true, but it is close. But not 
knowing anything about when offsets have been written to offset-storage would 
definitely leave me in the blind.
{/quote}

The idiomatic pattern for source connectors is to rely upon the source 
partition and source offsets included in each source record, and to have 
Connect periodically commit those offsets to offset storage (e.g., Kafka in 
distributed mode) as well as during graceful shutdown of the connector. Connect 
then uses the committed offsets upon restart to inform the connector of the 
last persisted offsets the connector provided, and then the connector can 
restart from that point. In this scenario, the connector doesn't need to 
acknowledge or be notified of the committed offsets since Connect is completely 
managing the offsets for the connector.

I'd be interested in hearing more about your use case, where IIUC your 
connector needs to notify the source system that progress has completed. Can 
you describe the source system and why you're using this approach? Does the 
source not allow you to replay from any point? Are you using this mechanism to 
notify the source system that it can garbage collect events that your connector 
is reading?

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-19 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171249#comment-16171249
 ] 

Per Steffensen commented on KAFKA-5716:
---

FWIW

bq. WorkerSourceTask doesn't have synchronization around at least one call to 
commitSourceTask (which invokes the SourceTask.commit()) method so you may call 
this while the main thread is invoking some other method on the connector.

{{commitSourceTask}} is only called by {{commitOffsets}}, which is only 
(intended to be?) called by {{SourceTaskOffsetCommitter}}, so no need to have 
synchronization on that, to prevent it from being called several times 
concurrently. {{commitOffsets}} is also called by main-thread on 
{{execute}}-finish, but besides that. I guess, with the way there are 
synchronization in {{commitOffsets}} it works nicely being called by 
{{SourceTaskOffsetCommitter}}, at the same time as the main-thread is calling 
other methods on the connector. It provides the semantics that: 
{{task.commit()}} is called when offsets has been written and flushed, and the 
offsets that has been written and flushed will always include everything from a 
{{task.poll}} (and all previous polls), BUT NOT NECESSARILY THE LAST (FEW) 
POLL(S). It is all or nothing per poll. The only problem I see is that it is 
does not necessarily contain offsets from records from the last (few) poll(s). 
That is what this KAFKA-5716 is about.

bq. Correcting the code would be tough

I think it is a matter of exposing to {{task.commit}} which polled records have 
had their offset-changes included in the offsets-write and which have not. 
Guess it could be done by just giving "the last record that has been included 
in the offset-write", indirectly indicating the last poll that has been 
included. Please see the attached patch. And then change {{SourceTask}} to:
{code}
public void commit(SourceRecord lastPolledRecordWithOffsetsWritten) throws 
InterruptedException {
commit();
}
public void commit() throws InterruptedException {
// This space intentionally left blank.
}
{code}
And some updated JavaDoc - maybe including deprecation of {{commit()}}, because 
it is ambiguous.

bq. I'd propose a different solution: let's just remove the method

Uhhh, please do not. Then the {{SourceTask}}-implementation will have 
absolutely no clue when offsets has been written and flushed. {{commitRecord}} 
will not help, as it is completely independent when Kafka acknowledges outgoing 
records (triggering {{commitRecord}}) and when record-offsets are written to 
offset-storage (triggering {{commit}})

bq. Given its current state, it seems unlikely anyone is actually using this 
functionality anyway

I am! And couldn't live without it :-) I cannot afford to acknowledge data 
going into my source-connector before the corresponding outgoing records AND 
their offset-changes has been written, flushed and acknowledged. Today I 
pretend that everything polled has been written to offset-storage when 
{{task.commit()}} is called, even though that is not always entirely true, but 
it is close. But not knowing anything about when offsets have been written to 
offset-storage would definitely leave me in the blind.

bq. If someone did want this functionality, we likely should just add a new 
{{commit(Map)}} variant

That is a good alternative to the {{commit(SourceRecord 
lastPolledRecordWithOffsetsWritten)}} variant I suggested.

bq. It'd be interesting to know if the method is overridden in any connectors 
that we know about in the wild.

I hope it will often be the case, because that is the only clue you have about 
when offsets have been written to offset-storage, so if you are actually using 
connect-offsets (and you probably are, since you are using connect), you would 
probably have an incorrect system if you do not take advantage of {{commit()}}. 
You cannot use {{commitRecord}} for that.

bq. If we just wanted a short term fix, we could definitely update the javadoc 
to make it clear what's actually happening and that this probably isn't what 
you want.

That would be easy :-) But would not leave the {{SourceTask}}-implementor in 
any better position. It will still be undefined which polled records had their 
offsets written and which had not. This small change will help
{code}
public void commit(SourceRecord lastPolledRecordWithOffsetsWritten) throws 
InterruptedException {
commit();
}
public void commit() throws InterruptedException {
// This space intentionally left blank.
}
{code}
Or the {{commit(Map)}} variant, you suggest.

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-18 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16170270#comment-16170270
 ] 

Ewen Cheslack-Postava commented on KAFKA-5716:
--

The basic analysis sounds right. That javadoc is from way back from the initial 
PR. I can't remember if we even had the separate thread for offset commits at 
that point, so it may have just grown outdated (and that may have been the case 
just based on my development of the initial version and may have just been 
missed in the initial review, which given it was an [11k line 
patch|https://github.com/apache/kafka/pull/99], I'm not surprised some things 
were missed in the review...

There's actually another problem that reviewing this revealed that, at best is 
important but not clearly documented and at worst is another bug: 
WorkerSourceTask doesn't have synchronization around at least one call to 
commitSourceTask (which invokes the SourceTask.commit()) method so you may call 
this while the main thread is invoking some other method on the connector. I 
think we document elsewhere that stop() has this behavior (to interrupt 
polling) and so perhaps people would have handled synchronization properly, but 
more likely there would just be bugs.

Correcting the code would be tough. We could update the javadoc, but I'd 
propose a different solution: let's just remove the method. It's bad that it's 
not actually doing what it claims and correcting it would require committing 
offsets synchronously which we don't want to do either. Given its current 
state, it seems unlikely anyone is actually using this functionality anyway. 
The intent was to allow you to, e.g., delete or ack data once you know it has 
been committed, but it clearly isn't serving that purpose and commitRecord() 
was added to give finer grained feedback. If someone did want this 
functionality, we likely should just add a new commit(Map) variant that can actually express to the user what we actually 
want... It'd be interesting to know if the method is overridden in any 
connectors that we know about in the wild.

If we just wanted a short term fix, we could definitely update the javadoc to 
make it clear what's actually happening and that this probably isn't what you 
want.

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-15 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167956#comment-16167956
 ] 

Randall Hauch commented on KAFKA-5716:
--

[~ewencp], if you wouldn't mind, would you take a look at this? In the issue 
description above, [~steff1193] suggests two possible fixes:

1) Changing the JavaDoc to say what the code *actually* does. This _may not_ 
require a KIP if the consensus is that this *clarifies* existing behavior 
rather than changing the behavior.
2) Correcting the code to implement the specified behavior. This definitely 
requires a KIP as it would change the public API to add a method that does the 
correct thing (this could be done in a backward compatible manner).

I'm unable to think of another fix, but perhaps something is more obvious to 
you. Thoughts?

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-12 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16163338#comment-16163338
 ] 

Randall Hauch commented on KAFKA-5716:
--

{quote}
If you can point me in the direction of some existing tests testing stuff like 
this, I may get the time to help write such a test. I am not sure how you 
usually write tests - the style, mocking, unit vs integration etc., so it would 
be nice with a pointer.
{quote}

Check out the 
[WorkerSourceTaskTest.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java]
 class that uses lots of mocks.

{quote}
Lets me know if I should try to find time to write a test and/or create a 
pull-request (not ready for pull, yet)
{quote}

Yes, I think this is complicated code, and we need one or more test cases that 
verify the problem and, more importantly, replicate the problem so that we know 
any changes/corrections to the code actually fix it without regressing any 
other behavior. 

Glad you found the contributors guide! Very much looking forward to more 
details!

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-12 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16162829#comment-16162829
 ] 

Per Steffensen commented on KAFKA-5716:
---

{quote}
* Do you also use pull-request for early-not-ready-to-be-pulled code-submission?
* Do you have a "how to contribute" guide somewhere, that I should read, in 
order to be able to "follow procedure" correctly?
{quote}

Never mind, found the answers on https://kafka.apache.org/contributing and 
https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-12 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16162824#comment-16162824
 ] 

Per Steffensen commented on KAFKA-5716:
---

Sorry for the late response. I have been busy (writing source-connectors :-) )

{quote}
{code}
+ * will be written before after the return of from the call to this method.
{code}
Can you rephrase the above ?
{quote}
"No additional offsets will be written or flushed before returning from the 
call to {{offsetsCommitted}}."

Do not know if thats better :-)

bq. Consider putting this as pull request (off trunk).

Ok, maybe I will later. I thought it probably would be a little too early. I 
guess, when submitting a pull-request, you actually have something you would 
like pulled. The patch I did is definitely not ready for inclusion in the 
actual code-base - at least not before having discussed thoroughly with you 
guys. And the final code to be pulled, should include a test testing that 
things work as expected - IMHO.
* Do you also use pull-request for early-not-ready-to-be-pulled code-submission?
* Do you have a "how to contribute" guide somewhere, that I should read, in 
order to be able to "follow procedure" correctly?

bq. Have you tried replicating this with a test case?

No unfortunately not. As I stated, it is by code-inspection only, that I put 
forward this issue.

bq. It might not be straightforward considering it is dependent upon the timing 
of the {{WorkerSourceTask}} implementation, but is seems like it should be 
possible to replicate with a mock offset writer and consumer. Not only will 
this help prove your conjecture, but it also will help verify that we can 
correctly fix it.

I believe it will be fairly easy to create a test that shows the problem (if I 
am right about its existence). But yes, you probably have to mock an 
{{OffsetStorageWriter}} with a slow {{doFlush}}-method or an 
{{OffsetBackingStore}} with a slow {{set}}-method.
If you can point me in the direction of some existing tests testing stuff like 
this, I may get the time to help write such a test. I am not sure how you 
usually write tests - the style, mocking, unit vs integration etc., so it would 
be nice with a pointer.

bq. Also, please be aware that all changes to the Connect API must be binary 
compatible with previous releases so that existing {{SourceTask}} 
implementations need not be changed or even recompiled to run on newer versions 
of the framework

Yeah, I thought so. At least such a change should go through a long deprecated 
period, and only actually make the incompatible change in a new major release. 
The renaming etc in my patch was just a suggestion. Lets forget about it for 
now.

bq. FWIW, I personally think the names of {{commitRecord(...)}}, {{commit()}}, 
and even a new {{commitOffsets(SourceRecord)}} on the {{SourceTask}} actually 
do make sense since they reflect the action that the source task should take.

IMHO FWIW, I do not agree. At least it depends on the source-system of the 
concrete {{SourceTask}}. THE NAMING IS NOT VERY IMPORTANT TO ME, so lets just 
forget about it. But because we are discussing it now, I will just elaborate on 
my "mental model" leading to my opinion on naming.

{{SourceTask.commitRecord(R)}} is a notification from Kafka-connect, that the 
outgoing message wrapped in a given {{SourceRecord}} R has been forwarded to 
and acknowledged by the receiving Kafka output-topic. The source-task typically 
produced this {{SourceRecord}} R, from some data D received/pulled from the 
source-system. What the source-task should typically do on 
{{SourceTask.commitRecord(R)}} is to ACKNOWLEDGE the reception, handling and 
forwarding of the input-data D. Such an "acknowledgement" may or may not 
include something that can be referred to as "commit".

Traditionally "commit" is used against output-systems, while "acknowledge" is 
used against input-systems. For a source-connector, Kafka-connect is handling 
the output-side, and therefore all "committing" against the output-side, while 
the source-task is handling the input-side and will have to "acknowledge" 
against the input-system. I believe the most common implementation in 
{{SourceTask.commitRecord}} will be "acknowledging" input-data received from 
the source-system.

E.g. consider a source-connector with source-tasks consuming data from a 
Kafka-topic. Yes, a source-connector where the source-system is Kafka itself - 
after all according to my "mental model" a source-connector is about making a 
none-Kafka data-source map into and "feel like" a Kafka data-source (streamed, 
partitioned, etc). Imaging such a source-connector with a Kafka-topic as 
"source". The source-connector would probably create tasks for each partition 
in the source-topic. The task would probably do something a-la:
* Receive a message (IM) from the designated partition of the source-topic via 
Kafka-consumer C
* 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-10 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16160407#comment-16160407
 ] 

Randall Hauch commented on KAFKA-5716:
--

[~steff1193], thanks for logging the issue and submitting a proposal for the 
change. You're analysis does seem to be correct, though I need to spend a bit 
more time verifying it, and I'd love to hear [~hachikuji]'s thoughts. 

Have you tried replicating this with a test case? It might not be 
straightforward considering it is dependent upon the timing of the 
WorkerSourceTask implementation, but is seems like it should be possible to 
replicate with a mock offset writer and consumer. Not only will this help prove 
your conjecture, but it also will help verify that we can correctly fix it.

Also, please be aware that all changes to the Connect API *must* be binary 
compatible with previous releases so that existing SourceTask implementations 
need not be changed or even recompiled to run on newer versions of the 
framework. This has two implications for your proposed patch.

First, we cannot change the signature of public API methods, so we need to 
avoid changing the name of {{commitRecord(...)}}. Like it or not, though, we're 
stuck with that naming convention. FWIW, I personally think the names of 
{{commitRecord(...)}}, {{commit()}, and even a new 
{{commitOffsets(SourceRecord)}} on the SourceTask actually _do_ make sense 
since they reflect the action that the source task should take. 

Second, we have to keep the existing {{commit()}} method, but we deprecate it 
and can add a new method such as {{commitOffsets(SourceRecord)}} that has the 
behavior you state and that should by default call the existing {{commit()}}. 
Doing this would allow existing SourceTask implementations to continue to work, 
albeit potentially with the incorrect assumptions/behavior as you argue above. 
However, any developer that wants to take advantage of the new/correct behavior 
can change their {{SourceTask}} implementations to instead use 
{{commitOffsets(SourceRecord)}} rather than {{commit()}} to switch to the 
correct behavior. Note that we should definitely have thorough JavaDoc for both 
{{commit()}} and {{commitOffsets(SourceRecord)}} that should say what they do 
by default and how implementations can take advantage of them.

Again, I *think* your analysis is correct, but I need to spend time looking 
into this and hopefully get others to confirm it as well. In the meantime, it 
might be worth updating your patch and, as [~yuzhih...@gmail.com] mentioned, 
creating a pull request with your changes based the {{trunk}} branch. One 
advantage of maintaining binary compatibility is that we can more easily 
backport the changes to older branches, though just because we can doesn't mean 
we will. If the committers do backport and have any non-trivial merge 
conflicts, they'll ask for a separate dedicated PR for the older branch(es).

Thanks again for all your work on this!

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering 

[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-09-09 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16160002#comment-16160002
 ] 

Ted Yu commented on KAFKA-5716:
---

{code}
+ * will be written before after the return of from the call to this method.
{code}
Can you rephrase the above ?
{code}
+ * @param record Last polled {@link SourceRecord} that had its offsets 
successfully written and flushed.
{code}
Please note that record may be null.

Consider putting this as pull request (off trunk).

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Per Steffensen
>Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-08-09 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120577#comment-16120577
 ] 

Per Steffensen commented on KAFKA-5716:
---

{quote}
Commit the offsets, up to the offsets that have been returned by \{@link 
#poll()}
*and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
{quote}

Actually I am pretty sure that is not true either, so I guess when 
task.commit() is called, you really do not know exactly for which of the 
SourceRecords you returned from poll
* a) the derived outgoing ProducerRecord has been sent to Kafka
* b) the derived connector-offsets has been written/flushed

Guess the only thing I can think of, where you do not have to introduce 
significantly more synchronization (we do not want that), but where you as the 
SourceTask-developer knows "enough" for most cases, is to change 
SourceTask.commit() to take a parameter of type SourceRecord. This record will 
be the last record given by WorkerSourceTask to its offsetWriter. The 
SourceTask-developer will know upon call to commit, that
* a) Outgoing ProducerRecords derived from any SourceRecord returned from poll 
prior to the given SourceRecord (included) have been sent to Kafka. Outgoing 
ProducerRecords derived from any SourceRecord returned from poll after the 
given SourceRecord may or may not have been sent to Kafka (conservative 
guarantee)
* b) Connector-offsets derived from any SourceRecord returned from poll prior 
to the given SourceRecord (included) have been written and flushed to 
offset-storage. Outgoing connector-offsets derived from any SourceRecord 
returned from poll after the given SourceRecord have NOT been written to 
offset-storage (strict guarantee)

Sorry, just thinking loud 

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> ---
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
>  Issue Type: Bug
>Reporter: Per Steffensen
>Priority: Minor
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method