[VOTE] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2019-01-18 Thread Per Steffensen

Please vote!



[VOTE] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-12-12 Thread Per Steffensen

Hope for your votes



Permissions to edit KIP page and ...

2018-12-12 Thread Per Steffensen

Hi

I used to have permissions to edit KIP page 
(https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals). 
Now suddenly I do not. I created KIP-381 and now wanted to change to put 
it into vote mode. I cannot edit neither KIP page nor KIP-381 page. 
Please give me permissions again.


Username: steff1193

Kind regards, Per Steffensen



Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-21 Thread Per Steffensen



On 18/10/2018 16.46, Ryanne Dolan wrote:

Per Steffenson, getting sequence numbers correct is definitely difficult,
but this is not Connect's fault. I'd like to see Connect implement
exactly-once from end-to-end, but that requires coordination between
sources and sinks along the lines that you allude to, using sequence
numbers and transactions and whatnot.

The problem with commit() is knowing when it's okay to delete the files in
your example. I don't believe that issue has anything to do with avoiding
dupes or assigning unique sequence numbers. I believe it is safe to delete
a file if you know it has been delivered successfully, which the present
API exposes.
Ok, I believe you put too much into my example. It was just what I was 
able to come up with, that was so simple that it could be explained 
fairly easily, and where it would be important that you know for which 
records offsets has been flushed. The example may not have been good 
enough to fulfill its purpose of showing that knowing exactly for which 
records offsets have been flushed is important.


You might argue that you, as a source-connector developer, do not need 
to know about when offsets are flushed at all. I will argue that in many 
cases you do, and that you may need to know exactly which records have 
had their offsets flushed. In that case the current "commit" method is 
useless. You do not know which records had their offsets flushed
* It is not necessarily all the records returned from "poll", at the 
point of "commit" called. Even though the current JavaDoc claims so
* It is not necessarily all the records for which "commitRecord" has 
been called. It may be more records, or it may be less records
Bottom line is that current "commit" cannot be used for much. You may 
argue that it should just be removed, but I definitely would not like to 
see that. I use "commit" i several of my source-connectors (pretending 
that it works), and could not live without it.
As I see it the offsets are kinda your accounting "information about how 
to proceed from where you came to". Kafka Connect offers to help me keep 
track of that accounting in alignment with the outgoing data related to 
that accounting. Of course I could just deal with all that myself, but 
then a lot of the niceness of Kafka Connect would be gone, and I might 
as well just do everything myself.


That said, I'm not opposed to your proposed callbacks, and I agree that
commit() and commitRecord() are poorly named. I just don't believe the
present API is incorrect.
I definitely do. Currently there is a callback "commit" that lies in its 
JavaDoc, and that essentially cannot be used for anything, except for 
making you confused. You know nothing about the state when it is called.


But as long as you do not oppose the proposed solution, we probably 
should not spend too much time arguing back and forth about opinions.


Ryanne

Regards, and thanks for participating in the discussion
Per Steffensen


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-18 Thread Per Steffensen

On 17/10/2018 18.17, Ryanne Dolan wrote:

> this does not guarantee that the
> offsets of R have been written/flushed at the next commit() call

True, but does it matter? So long as you can guarantee the records are 
delivered to the downstream Kafka cluster, it shouldn't matter if they 
have been committed or not.


The worst that can happen is that the worker gets bounced and asks for 
the same records a second time. Even if those records have since been 
dropped from the upstream data source, it doesn't matter cuz you know 
they were previously delivered successfully.


You are kinda arguing that offsets are not usable at all. I think they 
are. Below I will explain a fairly simple source-connector, and how it 
would be mislead by the way source-connector-framework currently works, 
and how my fix would help it not be. The source-connector is picked out 
of blue air, but not too far from what I have had to deal with in real life


Lets assume I write a fairly simple source-connector, that picks up data 
from files in a given folder. For simplicity lets just assume that each 
file fits in a Kafka-message. My source-connector just sorts the files 
by timestamp and sends out the data in the files, oldest file first. It 
is possible that the receiving side of the data my source-connector 
sends out, will get the same data twice, for one of the following reasons
* There were actually two input-files that contained exactly the same 
data (in that case the receiving side should handle it twice)
* The data from that same file may be sent twice in two Kafka-messages, 
due to global atomicy being impossible (in that case the receiving side 
should only handle the data once)
I order to allow the receiving side to know, when two consecutive 
messages are essentially the same, so that it will know only to handle 
one of them, I introduce a simple sequence-numbering system in my 
source-connector. I simply write a sequence-number in the 
Kafka-messages, and I use Kafka-connect offsets to keep track of the 
next sequence-number to be used, so that I can pick up with the correct 
sequence-number in case of a crash/restart. If there is no offsets when 
the source-connector starts (first start) it will just start with 
sequence-number 1.


*Assume the following files are in the input-folder:*
* 2018-01-01_10_00_00-.data
* 2018-01-01_10_00_00-.data
* 2018-01-01_10_00_01-.data
* 2018-01-01_10_00_02-.data
…

*Now this sequence of events are possible*
* mySourceConnector.poll() —> [
  R1 = record({seq: 1, data=2018-01-01_10_00_00-.data>},{ nextSeq=2 }},
  R2 = record({seq: 2, data=2018-01-01_10_00_00-.data>},{ nextSeq=3 }}

]
* data of R1 was sent and acknowledged
* mySourceConnector.commitRecord(R1)
* data of R2 was sent and acknowledged
* mySourceConnector.commitRecord(R2)
* offsets-committer kicks in around here and picks up the offsets from 
R1 and R2, resulting in the merged offsets to written and flushed to be 
{ nextSeq=3 }

* mySourceConnector.poll() —> [
  R3 = record({seq: 3, data=2018-01-01_10_00_01-.data>},{ nextSeq=4 }}

]
* data of R3 was sent and acknowledged
* mySourceConnector.commitRecord(R3)
* offsets-committer finishes writing and flushing offsets { nextSeq=3 }
* mySourceConnector.commit()

In mySourceConnector.commit() implementation I believe that the data and 
offsets for R1, R2 and R3 has been sent/written/flushed/acknowledged, 
and therefore I delete the following files

* 2018-01-01_10_00_00-.data
* 2018-01-01_10_00_00-.data
* 2018-01-01_10_00_01-.data
But the truth is that data for R1, R2 and R3 has been sent with 
sequence-number 1, 2 and 3 respectively, but the flushed offsets says { 
nextSeq=3 }, and not { nextSeq=4 } which I would indirectly expect
If the system crashes here, upon restart I will get { nextSeq=3 }, but 
file containing the data supposed to get sequence-number 3 has already 
been deleted. Therefore I will end up with this next poll

* poll() —> [
  R4 = record({seq: 3, data=2018-01-01_10_00_02-.data},{ nextSeq=4 }}

]
If my system had worked I should have ended up with this next poll
* poll() —> [
  R4 = record({seq: 4, data=2018-01-01_10_00_02-.data},{ nextSeq=5 }}

]
The receiving side of my data will get two messages containing the same 
sequence-number 3. It will therefore incorrectly ignore the second 
message. Even if it double check by looking at the actual data of the 
two message, and If the content of 2018-01-01_10_00_01-.data and 2018-01-01_10_00_02-.data was actually identical, it has no way 
of figuring out to do the right thing (actually handle both messages)


*With my fix to the problem*, the call to commit() would have been
mySourceConnector.commit([R1, R2])
I would know only to delete the following files
* 2018-01-01_10_00_00-.data
* 2018-01-01_10_00_00-.data
And after crash/restart I would end up sending the correct next message
mySourceConnector.poll() —> [
  R3 = record({seq: 3, data=2018-01-01_10_00_01-.data>},{ nextSeq=4 }}

]



Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-17 Thread Per Steffensen

On 17/10/2018 16.43, Ryanne Dolan wrote:

I see, thanks.
On the other hand, the commitRecord() callback provides the functionality
you require in this case. In commitRecord() your SourceTask can track the
offsets of records that have been ack'd by the producer client, and then in
commit() you can be sure that those offsets have been flushed.

That is the trick I am currently using - more or less.
But unfortunately it does not work 100% either. It is possible that 
commitRecord() is called with a record R, and then commit() is called 
after that, without the offsets of R having been written/flushed. The 
call to commitRecord() means that the "actual data" of R has been 
send/acknowledged, but unfortunately this does not guarantee that the 
offsets of R have been written/flushed at the next commit() call




Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-17 Thread Per Steffensen
The fix will definitely "facilitate" the source-connectors I have 
written. It will make them work 100% correctly. Today they dont.


Fine for me to change "Acknowledged" to "Acked" in the method-naming.

Not sure I would like to give a Collection instead of a List as the 
argument to offstesFlushedAndAck(nowledg)ed. poll() returns a List 
(ordered records), the records are handled in that order and I would 
like to hand the records back in that order as well. Handling back a 
Collection may indicate that order does not matter. Besides that it is 
likely to help the implementation of offstesFlushedAndAck(nowledg)ed 
that you get records back in order.


Regarding adding stuff to the "rejected approaches" and "motivation" 
sections of the KIP, I am not sure I will get the time anytime soon. 
Please feel free to help adding this to the KIP. This way we also have 
at least two persons who really understands what this is about. Some 
times you only really understand what something is about, when you are 
forced to write about it (at least that is my excuse ).


Regards, Per Steffensen

On 16/10/2018 05.57, Konstantine Karantasis wrote:

This is a significant improvement to the semantics of source connectors.
I'm expecting that it will facilitate source connector implementations and
even enrich the application uses cases that we see. I only have a few minor
suggestions at the moment.

I believe that Acked is a common abbreviation for Acknowledged and that we
could use it in this context. And this suggestion is coming from a big
proponent of complete words in variables and method names. Thus, feel free
to consider 'offsetsFlushedAndAcked' as well as 'recordSentAndAcked'. Since
this is a public interface, I'd also make the implementation specific
comment that a Collection might be more versatile than
List as argument in offsetsFlushedAndAcknowledged.

The rejected approaches section could use some of the material in the
original jira ticket, which is pretty insightful in order to understand how
we arrived to this KIP. For example, I think it'd be useful to state
briefly why the 'commit' method is not getting removed completely but it's
substituted with 'offsetsFlushedAndAcked'. Also useful I believe it would
be to include in the motivation section some info related to why and how a
source system could use these method calls to safely recycle data that have
been surely imported to Kafka. I see this type of use cases having an
increased importance as Kafka is used more and more as the source of truth
and persistence layer for an increasing number of applications.

These suggestions, although they are not strictly required in order to move
forward with this improvement, I believe can help a lot to understand the
context of this proposed changed, without having to read the complete
history in the jira ticket.

Thanks for the KIP Per!

-Konstantine


On Wed, Oct 10, 2018 at 6:50 AM Per Steffensen  wrote:


Please help make the proposed changes in KIP-381 become reality. Please
comment.

KIP:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback

JIRA: https://issues.apache.org/jira/browse/KAFKA-5716

PR: https://github.com/apache/kafka/pull/3872

Thanks!







Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-17 Thread Per Steffensen
Lets use X for the the point in time where commit() is called. Lets use 
Rs(X) for the recorders returned by poll()s at time X.
At time X, it is not necessarily true that all records in Rs(X) have 
been sent to Kafka (and acknowledged) and had their offsets flushed to 
offset-store.


Example
* Time X-1: poll() is called and one records R is returned
* Time X: commit() is called. There is no guarantee that the data in R 
has been sent/acknowledged to/by Kafka, nor that the offsets in R has 
been flushed to offset-store (it is likely, though).


Due to synchronization necessary, it is probably hard to make that 
guarantee, without reducing throughput significantly. But it is feasible 
to make the change that commit() is given (via argument) a 
list/collection of the records for which it is a guarantee. Thats what 
my current fix does (see PR).


On 16/10/2018 19.33, Ryanne Dolan wrote:

Steff,

> Guess people have used it, assuming that all records that have been 
polled > at the time of callback to "commit", have also had their 
offsets committed. > But that is not true.


(excerpt from KIP)

The documentation for SourceTask.commit() reads:

> Commit the offsets, up to the offsets that have been returned by 
{@link #poll()}. This > method should block until the commit is complete.


I'm confused by these seemingly contradictory statements. My 
assumption (as you say) is that all records returned by poll() will 
have been committed before commit() is invoked by the framework. Is 
that not the case?


Ryanne

On Wed, Oct 10, 2018 at 8:50 AM Per Steffensen <mailto:perst...@gmail.com>> wrote:


Please help make the proposed changes in KIP-381 become reality.
Please
comment.

KIP:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback

JIRA: https://issues.apache.org/jira/browse/KAFKA-5716

PR: https://github.com/apache/kafka/pull/3872

Thanks!






[DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-10 Thread Per Steffensen
Please help make the proposed changes in KIP-381 become reality. Please 
comment.


KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback


JIRA: https://issues.apache.org/jira/browse/KAFKA-5716

PR: https://github.com/apache/kafka/pull/3872

Thanks!




[jira] [Created] (KAFKA-6505) Add simple raw "offset-commit-failures", "offset-commits" and "offset-commit-successes" count metric

2018-01-30 Thread Per Steffensen (JIRA)
Per Steffensen created KAFKA-6505:
-

 Summary: Add simple raw "offset-commit-failures", "offset-commits" 
and "offset-commit-successes" count metric
 Key: KAFKA-6505
 URL: https://issues.apache.org/jira/browse/KAFKA-6505
 Project: Kafka
  Issue Type: Improvement
    Reporter: Per Steffensen


MBean 
"kafka.connect:type=connector-task-metrics,connector=,task=x" 
has several attributes. Most of them seems to be avg/max/pct over the entire 
lifetime of the process. They are not very useful when monitoring a system, 
where you typically want to see when there have been problems and if there are 
problems right now.

E.g. I would like to expose to an administrator when offset-commits have been 
failing (e.g. timing out) including if they are failing right now. It is really 
hard to do that properly, just using attribute 
"offset-commit-failure-percentage". You can expose a number telling how much 
the percentage has changed between two consecutive polls of the metric - if it 
changed to the positive side, we saw offset-commit failures, and if it changed 
to the negative side (or is stable at 0) we saw offset-commit success - at 
least as long as the system has not been running for so long that a single 
failing offset-commit does not even change the percentage. But it is really 
odd, to do it this way.

*I would like to just see an attribute "offset-commit-failures" just counting 
how many offset-commits have failed, as an ever-increasing number. Maybe also 
attributes "offset-commits" and "offset-commit-successes". Then I can do a 
delta between the two last metric-polls to show how many offset-commit-attempts 
have failed "very recently". Let this ticket be about that particular added 
attribute (or the three added attributes).*



Just a note on metrics IMHO (should probably be posted somewhere else):

In general consider getting rid of stuff like avg, max, pct over the entire 
lifetime of the process - current state is what interests people, especially 
when it comes to failure-related metrics (failure-pct over the lifetime of the 
process is not very useful). And people will continuously be polling and 
storing the metrics, so we will have a history of "current state" somewhere 
else (e.g. in Prometheus). Just give us the raw counts. Modern monitoring tools 
can do all the avg, max, pct for you based on a time-series of 
metrics-poll-results - and they can do it for periods of your choice (e.g. 
average over the last minute or 5 minutes) - have a look at Prometheus PromQL 
(e.g. used through Grafana). Just expose the raw number and let the 
average/max/min/pct calculation be done on the collect/presentation side. Only 
do "advanced" stuff for cases that are very interesting and where it cannot be 
done based on simple raw number (e.g. percentiles), and consider whether doing 
it for fairly short intervals is better than for the entire lifetime of the 
process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6504) Connect: Some per-task-metrics not working

2018-01-30 Thread Per Steffensen (JIRA)
Per Steffensen created KAFKA-6504:
-

 Summary: Connect: Some per-task-metrics not working
 Key: KAFKA-6504
 URL: https://issues.apache.org/jira/browse/KAFKA-6504
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Per Steffensen


Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at 
least it seems like MBean 
"kafka.connect:type=source-task-metrics,connector=,task=x" 
attribute "source-record-active-count" reports the same number for all x tasks 
running in the same Kafka-Connect instance/JVM. E.g. if I have a 
source-connector "my-connector" with 2 tasks that both run in the same 
Kafka-Connect instance, but I know that only one of them actually produces 
anything (and therefore can have "active source-records") both 
"kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and 
"kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes up 
(following each other). It should only go up for the one task that actually 
produces something.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6503) Connect: Plugin scan is very slow

2018-01-30 Thread Per Steffensen (JIRA)
Per Steffensen created KAFKA-6503:
-

 Summary: Connect: Plugin scan is very slow
 Key: KAFKA-6503
 URL: https://issues.apache.org/jira/browse/KAFKA-6503
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Per Steffensen


Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is 
very slow - see logs from starting my Kafka-Connect instance at the bottom. It 
takes almost 4 minutes scanning. I am running Kafka-Connect in docker based on 
confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to /usr/share/java. The 
only thing I have added is a 13MB jar in 
/usr/share/java/kafka-connect-file-streamer-client containing two connectors 
and a converter. That one alone seems to take 20 secs.

If it was just scanning in the background, and everything was working it 
probably would not be a big issue. But it does not. Right after starting the 
Kafka-Connect instance I try to create a connector via the /connectors 
endpoint, but it will not succeed before the plugin scanning has finished (4 
minutes)

I am not even sure why scanning is necessary. Is it not always true that 
connectors, converters etc are mentioned by name, so to see if it exists, just 
try to load the class - the classloader will tell if it is available. Hmmm, 
there is probably a reason.

Anyway, either it should be made much faster, or at least Kafka-Connect should 
be fully functional (or as functional as possible) while scanning is going on.

{code}
[2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a 
moment ... (org.apache.kafka.connect.cli.ConnectDistributed)
[2018-01-30 13:52:27,218] INFO Loading plugin from: 
/usr/share/java/kafka-connect-file-streamer-client 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:43,037] INFO Registered loader: 
PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/}
 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:43,038] INFO Added plugin 
'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager'
 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:43,039] INFO Added plugin 
'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager'
 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:43,040] INFO Added plugin 
'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter'
 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:43,049] INFO Loading plugin from: 
/usr/share/java/kafka-connect-elasticsearch 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:47,595] INFO Registered loader: 
PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/}
 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:47,611] INFO Added plugin 
'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:47,651] INFO Loading plugin from: 
/usr/share/java/kafka-connect-jdbc 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:49,491] INFO Registered loader: 
PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-jdbc/} 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:49,491] INFO Added plugin 
'io.confluent.connect.jdbc.JdbcSinkConnector' 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:49,492] INFO Added plugin 
'io.confluent.connect.jdbc.JdbcSourceConnector' 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:52:49,663] INFO Loading plugin from: 
/usr/share/java/kafka-connect-s3 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:53:51,055] INFO Registered loader: 
PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-s3/} 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:53:51,055] INFO Added plugin 
'io.confluent.connect.s3.S3SinkConnector' 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:53:51,061] INFO Added plugin 
'io.confluent.connect.storage.tools.SchemaSourceConnector' 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:53:51,064] INFO Added plugin 
'io.confluent.connect.avro.AvroConverter' 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13:53:52,745] INFO Loading plugin from: 
/usr/share/java/kafka-connect-storage-common 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-01-30 13

[jira] [Created] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent

2017-08-09 Thread Per Steffensen (JIRA)
Per Steffensen created KAFKA-5716:
-

 Summary: Connect: When SourceTask.commit it is possible not 
everthing from SourceTask.poll has been sent
 Key: KAFKA-5716
 URL: https://issues.apache.org/jira/browse/KAFKA-5716
 Project: Kafka
  Issue Type: Bug
Reporter: Per Steffensen
Priority: Minor


Not looking at the very latest code, so the "problem" may have been corrected 
recently. If so, I apologize. I found the "problem" by code-inspection alone, 
so I may be wrong. Have not had the time to write tests to confirm.

According to java-doc on SourceTask.commit
{quote}
Commit the offsets, up to the offsets that have been returned by \{@link 
#poll()}. This
method should block until the commit is complete.

SourceTasks are not required to implement this functionality; Kafka Connect 
will record offsets
automatically. This hook is provided for systems that also need to store 
offsets internally
in their own system.
{quote}

As I read this, when commit-method is called, the SourceTask-developer is 
"told" that everything returned from poll up until "now" has been sent/stored - 
both the outgoing messages and the associated connect-offsets. Looking at the 
implementation it also seems that this is what it tries to "guarantee/achieve".

But as I see read the code, it is not necessarily true
The following threads are involved
* Task-thread: WorkerSourceTask has its own thread running 
WorkerSourceTask.execute.
* Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled to 
call WorkerSourceTask.commitOffsets (from a different thread)

The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
commitOffsets respectively, hindering the task-thread to add to 
outstandingMessages and offsetWriter while committer-thread is marking what has 
to be flushed in the offsetWriter and waiting for outstandingMessages to be 
empty. This means that the offsets committed will be consistent with what has 
been sent out, but not necessarily what has been polled. At least I do not see 
why the following is not possible:
* Task-thread polls something from the task.poll
* Before task-thread gets to add (all) the polled records to 
outstandingMessages and offsetWriter in sendRecords, committer-thread kicks in 
and does its commiting, while hindering the task-thread adding the polled 
records to outstandingMessages and offsetWriter
* Consistency will not have been compromised, but committer-thread will end up 
calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
records just polled from task.poll has been sent or corresponding 
connector-offsets flushed.

If I am right, I guess there are two way to fix it
* Either change the java-doc of SourceTask.commit, to something a-la (which I 
do believe is true)
{quote}
Commit the offsets, up to the offsets that have been returned by \{@link 
#poll()}
*and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
This method should block until the commit is complete.

SourceTasks are not required to implement this functionality; Kafka Connect 
will record offsets
automatically. This hook is provided for systems that also need to store 
offsets internally
in their own system.
{quote}
* or, fix the "problem" so that it actually does what the java-doc says :-)

If I am not right, of course I apologize for the inconvenience. I would 
appreciate an explanation where my code-inspection is not correct, and why it 
works even though I cannot see it. I will not expect such an explanation, 
though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: kafka connect questions

2017-07-05 Thread Per Steffensen

Well I guess there is one: https://github.com/dhanuka84/kafka-connect-tcp
Maybe you can use or build on top of that.

On 05/07/17 05:45, Gwen Shapira wrote:

I don't remember seeing one. There is no reason not to write one (let us
know if you do, so we can put it on the connector hub!).

Few things:
1. Connectors that listen on sockets typically run in stand-alone mode, so
they can tied to a specific machine (in distributed mode, connectors can
move around).
2. Why do you need a connector? Why not just use Kafka producer to send
protobuf directly to Kafka?

Gwen
On Tue, Jul 4, 2017 at 9:02 AM Clay Teahouse  wrote:


Hello All,

I'd appreciate your help with the following questions.

1) Is there kafka connect for listening to tcp sockets?

2) If, can the messages be in protobuf, with each messaged prefixed with
the length of the message?

thanks
Clay





[jira] [Created] (KAFKA-5505) Connect: Do not restart connector and existing tasks on task-set change

2017-06-23 Thread Per Steffensen (JIRA)
Per Steffensen created KAFKA-5505:
-

 Summary: Connect: Do not restart connector and existing tasks on 
task-set change
 Key: KAFKA-5505
 URL: https://issues.apache.org/jira/browse/KAFKA-5505
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.10.2.1
Reporter: Per Steffensen


I am writing a connector with a frequently changing task-set. It is really not 
working very well, because the connector and all existing tasks are restarted 
when the set of tasks changes. E.g. if the connector is running with 10 tasks, 
and an additional task is needed, the connector itself and all 10 existing 
tasks are restarted, just to make the 11th task run also. My tasks have a 
fairly heavy initialization, making it extra annoying. I would like to see a 
change, introducing a "mode", where only new/deleted tasks are started/stopped 
when notifying the system that the set of tasks changed (calling 
context.requestTaskReconfiguration() - or something similar).

Discussed this issue a little on dev@kafka.apache.org in the thread "Kafka 
Connect: To much restarting with a SourceConnector with dynamic set of tasks"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Kafka Connect: To much restarting with a SourceConnector with dynamic set of tasks

2017-05-24 Thread Per Steffensen

Thanks a lot for responding, Randall! See my comments below.

Regards, Per Steffensen

On 22/05/17 22:36, Randall Hauch wrote:
You're not doing anything wrong, but I suspect you're requesting task 
reconfiguration more frequently than was originally envisioned, which 
means that the current implementation is not as optimal for your case.

OK thanks for confirming


I'm not sure how much effort is required to implement this new 
behavior. The logic for the standalone worker is pretty 
straightforward, but the logic for the distributed worker is going to 
be much more involved.
Yeah, when I realized the "problem" I had a short look at the code to 
see if it was easily fixable. I never went deep into it, but it seem 
like more than just an hour of work.
But we also need to be careful about changing existing behavior, since 
it's not hard to imagine connectors that might expect that all tasks 
be restarted when there are any changes to the task configurations.

FWIW, I think it is a little hard to imagine :-)
If there's any potential that this is the case, we'd have to be sure 
to keep the existing behavior as the default but to somehow enable the 
new behavior if desired.

I definitely agree!


One possibility is to add an overloaded 
requestTaskReconfiguration(boolean changedOnly) that specifies whether 
only changed tasks should be reconfigured. This way the existing 
requestTaskReconfiguration() method could be changed to call 
requestTaskReconfiguration(false), and then the implementation has to 
deal with this.
Yep, or make is a optional standard-configuration that you can always 
give a connector. Potato, potato


But again, the bigger challenge is to implement this new behavior in 
the DistributedHerder. OTOH, perhaps it's not as complicated as I 
might guess.
Well I would really like to see it happen. Anyone up for it? Am I 
allowed to create a ticket on this?
What if I would like to give it a shot myself. Is there a committer that 
would help review and eventually commit?

Which branch should I make a PR to?