[VOTE] KIP-381 Connect: Tell about records that had their offsets flushed in callback
Please vote!
[VOTE] KIP-381 Connect: Tell about records that had their offsets flushed in callback
Hope for your votes
Permissions to edit KIP page and ...
Hi I used to have permissions to edit KIP page (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals). Now suddenly I do not. I created KIP-381 and now wanted to change to put it into vote mode. I cannot edit neither KIP page nor KIP-381 page. Please give me permissions again. Username: steff1193 Kind regards, Per Steffensen
Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback
On 18/10/2018 16.46, Ryanne Dolan wrote: Per Steffenson, getting sequence numbers correct is definitely difficult, but this is not Connect's fault. I'd like to see Connect implement exactly-once from end-to-end, but that requires coordination between sources and sinks along the lines that you allude to, using sequence numbers and transactions and whatnot. The problem with commit() is knowing when it's okay to delete the files in your example. I don't believe that issue has anything to do with avoiding dupes or assigning unique sequence numbers. I believe it is safe to delete a file if you know it has been delivered successfully, which the present API exposes. Ok, I believe you put too much into my example. It was just what I was able to come up with, that was so simple that it could be explained fairly easily, and where it would be important that you know for which records offsets has been flushed. The example may not have been good enough to fulfill its purpose of showing that knowing exactly for which records offsets have been flushed is important. You might argue that you, as a source-connector developer, do not need to know about when offsets are flushed at all. I will argue that in many cases you do, and that you may need to know exactly which records have had their offsets flushed. In that case the current "commit" method is useless. You do not know which records had their offsets flushed * It is not necessarily all the records returned from "poll", at the point of "commit" called. Even though the current JavaDoc claims so * It is not necessarily all the records for which "commitRecord" has been called. It may be more records, or it may be less records Bottom line is that current "commit" cannot be used for much. You may argue that it should just be removed, but I definitely would not like to see that. I use "commit" i several of my source-connectors (pretending that it works), and could not live without it. As I see it the offsets are kinda your accounting "information about how to proceed from where you came to". Kafka Connect offers to help me keep track of that accounting in alignment with the outgoing data related to that accounting. Of course I could just deal with all that myself, but then a lot of the niceness of Kafka Connect would be gone, and I might as well just do everything myself. That said, I'm not opposed to your proposed callbacks, and I agree that commit() and commitRecord() are poorly named. I just don't believe the present API is incorrect. I definitely do. Currently there is a callback "commit" that lies in its JavaDoc, and that essentially cannot be used for anything, except for making you confused. You know nothing about the state when it is called. But as long as you do not oppose the proposed solution, we probably should not spend too much time arguing back and forth about opinions. Ryanne Regards, and thanks for participating in the discussion Per Steffensen
Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback
On 17/10/2018 18.17, Ryanne Dolan wrote: > this does not guarantee that the > offsets of R have been written/flushed at the next commit() call True, but does it matter? So long as you can guarantee the records are delivered to the downstream Kafka cluster, it shouldn't matter if they have been committed or not. The worst that can happen is that the worker gets bounced and asks for the same records a second time. Even if those records have since been dropped from the upstream data source, it doesn't matter cuz you know they were previously delivered successfully. You are kinda arguing that offsets are not usable at all. I think they are. Below I will explain a fairly simple source-connector, and how it would be mislead by the way source-connector-framework currently works, and how my fix would help it not be. The source-connector is picked out of blue air, but not too far from what I have had to deal with in real life Lets assume I write a fairly simple source-connector, that picks up data from files in a given folder. For simplicity lets just assume that each file fits in a Kafka-message. My source-connector just sorts the files by timestamp and sends out the data in the files, oldest file first. It is possible that the receiving side of the data my source-connector sends out, will get the same data twice, for one of the following reasons * There were actually two input-files that contained exactly the same data (in that case the receiving side should handle it twice) * The data from that same file may be sent twice in two Kafka-messages, due to global atomicy being impossible (in that case the receiving side should only handle the data once) I order to allow the receiving side to know, when two consecutive messages are essentially the same, so that it will know only to handle one of them, I introduce a simple sequence-numbering system in my source-connector. I simply write a sequence-number in the Kafka-messages, and I use Kafka-connect offsets to keep track of the next sequence-number to be used, so that I can pick up with the correct sequence-number in case of a crash/restart. If there is no offsets when the source-connector starts (first start) it will just start with sequence-number 1. *Assume the following files are in the input-folder:* * 2018-01-01_10_00_00-.data * 2018-01-01_10_00_00-.data * 2018-01-01_10_00_01-.data * 2018-01-01_10_00_02-.data … *Now this sequence of events are possible* * mySourceConnector.poll() —> [ R1 = record({seq: 1, data=2018-01-01_10_00_00-.data>},{ nextSeq=2 }}, R2 = record({seq: 2, data=2018-01-01_10_00_00-.data>},{ nextSeq=3 }} ] * data of R1 was sent and acknowledged * mySourceConnector.commitRecord(R1) * data of R2 was sent and acknowledged * mySourceConnector.commitRecord(R2) * offsets-committer kicks in around here and picks up the offsets from R1 and R2, resulting in the merged offsets to written and flushed to be { nextSeq=3 } * mySourceConnector.poll() —> [ R3 = record({seq: 3, data=2018-01-01_10_00_01-.data>},{ nextSeq=4 }} ] * data of R3 was sent and acknowledged * mySourceConnector.commitRecord(R3) * offsets-committer finishes writing and flushing offsets { nextSeq=3 } * mySourceConnector.commit() In mySourceConnector.commit() implementation I believe that the data and offsets for R1, R2 and R3 has been sent/written/flushed/acknowledged, and therefore I delete the following files * 2018-01-01_10_00_00-.data * 2018-01-01_10_00_00-.data * 2018-01-01_10_00_01-.data But the truth is that data for R1, R2 and R3 has been sent with sequence-number 1, 2 and 3 respectively, but the flushed offsets says { nextSeq=3 }, and not { nextSeq=4 } which I would indirectly expect If the system crashes here, upon restart I will get { nextSeq=3 }, but file containing the data supposed to get sequence-number 3 has already been deleted. Therefore I will end up with this next poll * poll() —> [ R4 = record({seq: 3, data=2018-01-01_10_00_02-.data},{ nextSeq=4 }} ] If my system had worked I should have ended up with this next poll * poll() —> [ R4 = record({seq: 4, data=2018-01-01_10_00_02-.data},{ nextSeq=5 }} ] The receiving side of my data will get two messages containing the same sequence-number 3. It will therefore incorrectly ignore the second message. Even if it double check by looking at the actual data of the two message, and If the content of 2018-01-01_10_00_01-.data and 2018-01-01_10_00_02-.data was actually identical, it has no way of figuring out to do the right thing (actually handle both messages) *With my fix to the problem*, the call to commit() would have been mySourceConnector.commit([R1, R2]) I would know only to delete the following files * 2018-01-01_10_00_00-.data * 2018-01-01_10_00_00-.data And after crash/restart I would end up sending the correct next message mySourceConnector.poll() —> [ R3 = record({seq: 3, data=2018-01-01_10_00_01-.data>},{ nextSeq=4 }} ]
Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback
On 17/10/2018 16.43, Ryanne Dolan wrote: I see, thanks. On the other hand, the commitRecord() callback provides the functionality you require in this case. In commitRecord() your SourceTask can track the offsets of records that have been ack'd by the producer client, and then in commit() you can be sure that those offsets have been flushed. That is the trick I am currently using - more or less. But unfortunately it does not work 100% either. It is possible that commitRecord() is called with a record R, and then commit() is called after that, without the offsets of R having been written/flushed. The call to commitRecord() means that the "actual data" of R has been send/acknowledged, but unfortunately this does not guarantee that the offsets of R have been written/flushed at the next commit() call
Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback
The fix will definitely "facilitate" the source-connectors I have written. It will make them work 100% correctly. Today they dont. Fine for me to change "Acknowledged" to "Acked" in the method-naming. Not sure I would like to give a Collection instead of a List as the argument to offstesFlushedAndAck(nowledg)ed. poll() returns a List (ordered records), the records are handled in that order and I would like to hand the records back in that order as well. Handling back a Collection may indicate that order does not matter. Besides that it is likely to help the implementation of offstesFlushedAndAck(nowledg)ed that you get records back in order. Regarding adding stuff to the "rejected approaches" and "motivation" sections of the KIP, I am not sure I will get the time anytime soon. Please feel free to help adding this to the KIP. This way we also have at least two persons who really understands what this is about. Some times you only really understand what something is about, when you are forced to write about it (at least that is my excuse ). Regards, Per Steffensen On 16/10/2018 05.57, Konstantine Karantasis wrote: This is a significant improvement to the semantics of source connectors. I'm expecting that it will facilitate source connector implementations and even enrich the application uses cases that we see. I only have a few minor suggestions at the moment. I believe that Acked is a common abbreviation for Acknowledged and that we could use it in this context. And this suggestion is coming from a big proponent of complete words in variables and method names. Thus, feel free to consider 'offsetsFlushedAndAcked' as well as 'recordSentAndAcked'. Since this is a public interface, I'd also make the implementation specific comment that a Collection might be more versatile than List as argument in offsetsFlushedAndAcknowledged. The rejected approaches section could use some of the material in the original jira ticket, which is pretty insightful in order to understand how we arrived to this KIP. For example, I think it'd be useful to state briefly why the 'commit' method is not getting removed completely but it's substituted with 'offsetsFlushedAndAcked'. Also useful I believe it would be to include in the motivation section some info related to why and how a source system could use these method calls to safely recycle data that have been surely imported to Kafka. I see this type of use cases having an increased importance as Kafka is used more and more as the source of truth and persistence layer for an increasing number of applications. These suggestions, although they are not strictly required in order to move forward with this improvement, I believe can help a lot to understand the context of this proposed changed, without having to read the complete history in the jira ticket. Thanks for the KIP Per! -Konstantine On Wed, Oct 10, 2018 at 6:50 AM Per Steffensen wrote: Please help make the proposed changes in KIP-381 become reality. Please comment. KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback JIRA: https://issues.apache.org/jira/browse/KAFKA-5716 PR: https://github.com/apache/kafka/pull/3872 Thanks!
Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback
Lets use X for the the point in time where commit() is called. Lets use Rs(X) for the recorders returned by poll()s at time X. At time X, it is not necessarily true that all records in Rs(X) have been sent to Kafka (and acknowledged) and had their offsets flushed to offset-store. Example * Time X-1: poll() is called and one records R is returned * Time X: commit() is called. There is no guarantee that the data in R has been sent/acknowledged to/by Kafka, nor that the offsets in R has been flushed to offset-store (it is likely, though). Due to synchronization necessary, it is probably hard to make that guarantee, without reducing throughput significantly. But it is feasible to make the change that commit() is given (via argument) a list/collection of the records for which it is a guarantee. Thats what my current fix does (see PR). On 16/10/2018 19.33, Ryanne Dolan wrote: Steff, > Guess people have used it, assuming that all records that have been polled > at the time of callback to "commit", have also had their offsets committed. > But that is not true. (excerpt from KIP) The documentation for SourceTask.commit() reads: > Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This > method should block until the commit is complete. I'm confused by these seemingly contradictory statements. My assumption (as you say) is that all records returned by poll() will have been committed before commit() is invoked by the framework. Is that not the case? Ryanne On Wed, Oct 10, 2018 at 8:50 AM Per Steffensen <mailto:perst...@gmail.com>> wrote: Please help make the proposed changes in KIP-381 become reality. Please comment. KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback JIRA: https://issues.apache.org/jira/browse/KAFKA-5716 PR: https://github.com/apache/kafka/pull/3872 Thanks!
[DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback
Please help make the proposed changes in KIP-381 become reality. Please comment. KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback JIRA: https://issues.apache.org/jira/browse/KAFKA-5716 PR: https://github.com/apache/kafka/pull/3872 Thanks!
[jira] [Created] (KAFKA-6505) Add simple raw "offset-commit-failures", "offset-commits" and "offset-commit-successes" count metric
Per Steffensen created KAFKA-6505: - Summary: Add simple raw "offset-commit-failures", "offset-commits" and "offset-commit-successes" count metric Key: KAFKA-6505 URL: https://issues.apache.org/jira/browse/KAFKA-6505 Project: Kafka Issue Type: Improvement Reporter: Per Steffensen MBean "kafka.connect:type=connector-task-metrics,connector=,task=x" has several attributes. Most of them seems to be avg/max/pct over the entire lifetime of the process. They are not very useful when monitoring a system, where you typically want to see when there have been problems and if there are problems right now. E.g. I would like to expose to an administrator when offset-commits have been failing (e.g. timing out) including if they are failing right now. It is really hard to do that properly, just using attribute "offset-commit-failure-percentage". You can expose a number telling how much the percentage has changed between two consecutive polls of the metric - if it changed to the positive side, we saw offset-commit failures, and if it changed to the negative side (or is stable at 0) we saw offset-commit success - at least as long as the system has not been running for so long that a single failing offset-commit does not even change the percentage. But it is really odd, to do it this way. *I would like to just see an attribute "offset-commit-failures" just counting how many offset-commits have failed, as an ever-increasing number. Maybe also attributes "offset-commits" and "offset-commit-successes". Then I can do a delta between the two last metric-polls to show how many offset-commit-attempts have failed "very recently". Let this ticket be about that particular added attribute (or the three added attributes).* Just a note on metrics IMHO (should probably be posted somewhere else): In general consider getting rid of stuff like avg, max, pct over the entire lifetime of the process - current state is what interests people, especially when it comes to failure-related metrics (failure-pct over the lifetime of the process is not very useful). And people will continuously be polling and storing the metrics, so we will have a history of "current state" somewhere else (e.g. in Prometheus). Just give us the raw counts. Modern monitoring tools can do all the avg, max, pct for you based on a time-series of metrics-poll-results - and they can do it for periods of your choice (e.g. average over the last minute or 5 minutes) - have a look at Prometheus PromQL (e.g. used through Grafana). Just expose the raw number and let the average/max/min/pct calculation be done on the collect/presentation side. Only do "advanced" stuff for cases that are very interesting and where it cannot be done based on simple raw number (e.g. percentiles), and consider whether doing it for fairly short intervals is better than for the entire lifetime of the process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6504) Connect: Some per-task-metrics not working
Per Steffensen created KAFKA-6504: - Summary: Connect: Some per-task-metrics not working Key: KAFKA-6504 URL: https://issues.apache.org/jira/browse/KAFKA-6504 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Per Steffensen Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at least it seems like MBean "kafka.connect:type=source-task-metrics,connector=,task=x" attribute "source-record-active-count" reports the same number for all x tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a source-connector "my-connector" with 2 tasks that both run in the same Kafka-Connect instance, but I know that only one of them actually produces anything (and therefore can have "active source-records") both "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes up (following each other). It should only go up for the one task that actually produces something. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6503) Connect: Plugin scan is very slow
Per Steffensen created KAFKA-6503: - Summary: Connect: Plugin scan is very slow Key: KAFKA-6503 URL: https://issues.apache.org/jira/browse/KAFKA-6503 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Per Steffensen Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is very slow - see logs from starting my Kafka-Connect instance at the bottom. It takes almost 4 minutes scanning. I am running Kafka-Connect in docker based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to /usr/share/java. The only thing I have added is a 13MB jar in /usr/share/java/kafka-connect-file-streamer-client containing two connectors and a converter. That one alone seems to take 20 secs. If it was just scanning in the background, and everything was working it probably would not be a big issue. But it does not. Right after starting the Kafka-Connect instance I try to create a connector via the /connectors endpoint, but it will not succeed before the plugin scanning has finished (4 minutes) I am not even sure why scanning is necessary. Is it not always true that connectors, converters etc are mentioned by name, so to see if it exists, just try to load the class - the classloader will tell if it is available. Hmmm, there is probably a reason. Anyway, either it should be made much faster, or at least Kafka-Connect should be fully functional (or as functional as possible) while scanning is going on. {code} [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectDistributed) [2018-01-30 13:52:27,218] INFO Loading plugin from: /usr/share/java/kafka-connect-file-streamer-client (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:43,037] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:43,038] INFO Added plugin 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:43,039] INFO Added plugin 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:43,040] INFO Added plugin 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:43,049] INFO Loading plugin from: /usr/share/java/kafka-connect-elasticsearch (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:47,595] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:47,611] INFO Added plugin 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:47,651] INFO Loading plugin from: /usr/share/java/kafka-connect-jdbc (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:49,491] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-jdbc/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:49,491] INFO Added plugin 'io.confluent.connect.jdbc.JdbcSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:49,492] INFO Added plugin 'io.confluent.connect.jdbc.JdbcSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:52:49,663] INFO Loading plugin from: /usr/share/java/kafka-connect-s3 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:53:51,055] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-s3/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:53:51,055] INFO Added plugin 'io.confluent.connect.s3.S3SinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:53:51,061] INFO Added plugin 'io.confluent.connect.storage.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:53:51,064] INFO Added plugin 'io.confluent.connect.avro.AvroConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13:53:52,745] INFO Loading plugin from: /usr/share/java/kafka-connect-storage-common (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [2018-01-30 13
[jira] [Created] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
Per Steffensen created KAFKA-5716: - Summary: Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent Key: KAFKA-5716 URL: https://issues.apache.org/jira/browse/KAFKA-5716 Project: Kafka Issue Type: Bug Reporter: Per Steffensen Priority: Minor Not looking at the very latest code, so the "problem" may have been corrected recently. If so, I apologize. I found the "problem" by code-inspection alone, so I may be wrong. Have not had the time to write tests to confirm. According to java-doc on SourceTask.commit {quote} Commit the offsets, up to the offsets that have been returned by \{@link #poll()}. This method should block until the commit is complete. SourceTasks are not required to implement this functionality; Kafka Connect will record offsets automatically. This hook is provided for systems that also need to store offsets internally in their own system. {quote} As I read this, when commit-method is called, the SourceTask-developer is "told" that everything returned from poll up until "now" has been sent/stored - both the outgoing messages and the associated connect-offsets. Looking at the implementation it also seems that this is what it tries to "guarantee/achieve". But as I see read the code, it is not necessarily true The following threads are involved * Task-thread: WorkerSourceTask has its own thread running WorkerSourceTask.execute. * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled to call WorkerSourceTask.commitOffsets (from a different thread) The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and commitOffsets respectively, hindering the task-thread to add to outstandingMessages and offsetWriter while committer-thread is marking what has to be flushed in the offsetWriter and waiting for outstandingMessages to be empty. This means that the offsets committed will be consistent with what has been sent out, but not necessarily what has been polled. At least I do not see why the following is not possible: * Task-thread polls something from the task.poll * Before task-thread gets to add (all) the polled records to outstandingMessages and offsetWriter in sendRecords, committer-thread kicks in and does its commiting, while hindering the task-thread adding the polled records to outstandingMessages and offsetWriter * Consistency will not have been compromised, but committer-thread will end up calling task.commit (via WorkerSourceTask.commitSourceTask), without the records just polled from task.poll has been sent or corresponding connector-offsets flushed. If I am right, I guess there are two way to fix it * Either change the java-doc of SourceTask.commit, to something a-la (which I do believe is true) {quote} Commit the offsets, up to the offsets that have been returned by \{@link #poll()} *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. This method should block until the commit is complete. SourceTasks are not required to implement this functionality; Kafka Connect will record offsets automatically. This hook is provided for systems that also need to store offsets internally in their own system. {quote} * or, fix the "problem" so that it actually does what the java-doc says :-) If I am not right, of course I apologize for the inconvenience. I would appreciate an explanation where my code-inspection is not correct, and why it works even though I cannot see it. I will not expect such an explanation, though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
Re: kafka connect questions
Well I guess there is one: https://github.com/dhanuka84/kafka-connect-tcp Maybe you can use or build on top of that. On 05/07/17 05:45, Gwen Shapira wrote: I don't remember seeing one. There is no reason not to write one (let us know if you do, so we can put it on the connector hub!). Few things: 1. Connectors that listen on sockets typically run in stand-alone mode, so they can tied to a specific machine (in distributed mode, connectors can move around). 2. Why do you need a connector? Why not just use Kafka producer to send protobuf directly to Kafka? Gwen On Tue, Jul 4, 2017 at 9:02 AM Clay Teahousewrote: Hello All, I'd appreciate your help with the following questions. 1) Is there kafka connect for listening to tcp sockets? 2) If, can the messages be in protobuf, with each messaged prefixed with the length of the message? thanks Clay
[jira] [Created] (KAFKA-5505) Connect: Do not restart connector and existing tasks on task-set change
Per Steffensen created KAFKA-5505: - Summary: Connect: Do not restart connector and existing tasks on task-set change Key: KAFKA-5505 URL: https://issues.apache.org/jira/browse/KAFKA-5505 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 0.10.2.1 Reporter: Per Steffensen I am writing a connector with a frequently changing task-set. It is really not working very well, because the connector and all existing tasks are restarted when the set of tasks changes. E.g. if the connector is running with 10 tasks, and an additional task is needed, the connector itself and all 10 existing tasks are restarted, just to make the 11th task run also. My tasks have a fairly heavy initialization, making it extra annoying. I would like to see a change, introducing a "mode", where only new/deleted tasks are started/stopped when notifying the system that the set of tasks changed (calling context.requestTaskReconfiguration() - or something similar). Discussed this issue a little on dev@kafka.apache.org in the thread "Kafka Connect: To much restarting with a SourceConnector with dynamic set of tasks" -- This message was sent by Atlassian JIRA (v6.4.14#64029)
Re: Kafka Connect: To much restarting with a SourceConnector with dynamic set of tasks
Thanks a lot for responding, Randall! See my comments below. Regards, Per Steffensen On 22/05/17 22:36, Randall Hauch wrote: You're not doing anything wrong, but I suspect you're requesting task reconfiguration more frequently than was originally envisioned, which means that the current implementation is not as optimal for your case. OK thanks for confirming I'm not sure how much effort is required to implement this new behavior. The logic for the standalone worker is pretty straightforward, but the logic for the distributed worker is going to be much more involved. Yeah, when I realized the "problem" I had a short look at the code to see if it was easily fixable. I never went deep into it, but it seem like more than just an hour of work. But we also need to be careful about changing existing behavior, since it's not hard to imagine connectors that might expect that all tasks be restarted when there are any changes to the task configurations. FWIW, I think it is a little hard to imagine :-) If there's any potential that this is the case, we'd have to be sure to keep the existing behavior as the default but to somehow enable the new behavior if desired. I definitely agree! One possibility is to add an overloaded requestTaskReconfiguration(boolean changedOnly) that specifies whether only changed tasks should be reconfigured. This way the existing requestTaskReconfiguration() method could be changed to call requestTaskReconfiguration(false), and then the implementation has to deal with this. Yep, or make is a optional standard-configuration that you can always give a connector. Potato, potato But again, the bigger challenge is to implement this new behavior in the DistributedHerder. OTOH, perhaps it's not as complicated as I might guess. Well I would really like to see it happen. Anyone up for it? Am I allowed to create a ticket on this? What if I would like to give it a shot myself. Is there a committer that would help review and eventually commit? Which branch should I make a PR to?