Re: Kafka Connect connector monitoring tasks via offset storage reader

2016-05-24 Thread Randall Hauch



On May 24, 2016 at 4:28:41 PM, Liquan Pei (liquan...@gmail.com) wrote:

Hi Randall, 

This is interesting. Essentially we can track the progress by getting data 
from offset storage. What are the use cases you have in mind that uses the 
offsets of source partitions? I can imagine that comparing the source 
offsets for the new data and already delivered data and make some decisions 
such as task reconfiguration and etc.
Yes, exactly. I can think of two use cases:

Upon startup, using previously committed offsets to identify new vs existing 
source partitions, and using this to more intelligently distribute the source 
partitions across tasks.
Tracking the progress of tasks by reading the committed offsets, and signaling 
for task reconfiguration when task(s) have reached some predetermined point. 
One concern is that offsets read from OffsetStorageReader may be stale and 
we may end up making decisions not on the latest data. In general, I think 
the questions is that what do we want to do if we know that up to this 
source offset the data is delivered in Kafka. 
Would they really be stale during connector startup? Aren’t they accurate in 
this case, enabling the connector to make intelligent decisions about task 
configurations.

However, if the connector and tasks are running then, yes, the only guarantee 
about offsets read from storage is that the offsets were committed, so tasks 
have _at least_ recorded those offsets but may have done more.

From the API perspective, do we want to expose the OffsetStorageReader or 
just add a method to return the source offsets? Note that this is only 
relevant to source connectors, not sure whether makes sense or not to 
create SourceConnectorContext and SinkConnectorContext. 
Yes, this probably is only related to source connectors, and defining 
SourceConnectorContext and SinkConnectorContext may be the appropriate way to 
go forward. Changing the type of the ‘context’ field in the Connector abstract 
class or moving that to the appropriate subclasses would break binary classfile 
compatibility with earlier versions (meaning a newer version of Kafka Connect 
could not use a connector compiled with an older Kafka Connect library). But 
that may not matter, since the `Connector` interface is still annotated with 
“@InterfaceStability.Unstable” in the 0.10.0.0 tag [1] and in the trunk branch 
[2]. In fact, it may be useful to define those ConnectorContext subtypes sooner 
than later to allow binary classfile backward compatibility later on.

Best regards,

Randall

[1] 
https://github.com/apache/kafka/blob/0.10.0/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java

[2] 
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java



Thanks, 
Liquan 

On Tue, May 24, 2016 at 1:31 PM, Randall Hauch  wrote: 

> I have a need for one of my SourceConnector implementations to configure a 
> bunch of tasks and, when those are all “done”, request a task 
> reconfiguration so that it can run a single task. Think: many tasks to make 
> snapshot of database tables, then when those are completed reconfigure 
> itself so that it then started _one_ task to read the transaction log. 
> 
> Unfortunately, I can’t figure out a way for the connector to “monitor” the 
> progress of its tasks, especially when those tasks are distributed across 
> the cluster. The only way I can think of to get around this is to have my 
> connector start *one* task that performs the snapshot and then starts 
> reading the transaction log. Unfortunately, that means to parallelize the 
> snapshotting work, the task would need to manage its own threads. That’s 
> possible, but undesirable for many reasons, not the least of which is that 
> the work can’t be distributed as multiple tasks amongst the cluster of 
> Kafka Connect workers. 
> 
> On the other hand, a simple enhancement to Kafka Connect would make this 
> very easy: add to the ConnectorContext a method that returned the 
> OffsetStorageReader. The connector could start a thread to periodically 
> poll the offsets for various partitions, and effectively watch the progress 
> of the tasks. Not only that, the connector’s 'taskConfigs(int)’ method 
> could use the OffsetStorageReader to read previously-recorded offsets to 
> more intelligently configure its tasks. This seems very straightforward, 
> backward compatible, and non-intrusive. 
> 
> Is there any interest in this? If so, I can create an issue and work on a 
> pull request. 
> 
> Best regards, 
> 
> Randall Hauch 




-- 
Liquan Pei 
Software Engineer, Confluent Inc 


Re: Kafka Connect connector monitoring tasks via offset storage reader

2016-05-24 Thread Liquan Pei
Hi Randall,

This is interesting. Essentially we can track the progress by getting data
from offset storage. What are the use cases you have in mind that uses the
offsets of source partitions? I can imagine that comparing the source
offsets for the new data and already delivered data and make some decisions
such as task reconfiguration and etc.

One concern is that offsets read from OffsetStorageReader may be stale and
we may end up making decisions not on the latest data. In general, I think
the questions is that what do we want to do if we know that up to this
source offset the data is delivered in Kafka.

>From the API perspective, do we want to expose the OffsetStorageReader or
just add a method to return the source offsets?  Note that this is only
relevant to source connectors, not sure whether makes sense or not to
create SourceConnectorContext and SinkConnectorContext.

Thanks,
Liquan

On Tue, May 24, 2016 at 1:31 PM, Randall Hauch  wrote:

> I have a need for one of my SourceConnector implementations to configure a
> bunch of tasks and, when those are all “done”, request a task
> reconfiguration so that it can run a single task. Think: many tasks to make
> snapshot of database tables, then when those are completed reconfigure
> itself so that it then started _one_ task to read the transaction log.
>
> Unfortunately, I can’t figure out a way for the connector to “monitor” the
> progress of its tasks, especially when those tasks are distributed across
> the cluster. The only way I can think of to get around this is to have my
> connector start *one* task that performs the snapshot and then starts
> reading the transaction log. Unfortunately, that means to parallelize the
> snapshotting work, the task would need to manage its own threads. That’s
> possible, but undesirable for many reasons, not the least of which is that
> the work can’t be distributed as multiple tasks amongst the cluster of
> Kafka Connect workers.
>
> On the other hand, a simple enhancement to Kafka Connect would make this
> very easy: add to the ConnectorContext a method that returned the
> OffsetStorageReader. The connector could start a thread to periodically
> poll the offsets for various partitions, and effectively watch the progress
> of the tasks. Not only that, the connector’s 'taskConfigs(int)’ method
> could use the OffsetStorageReader to read previously-recorded offsets to
> more intelligently configure its tasks. This seems very straightforward,
> backward compatible, and non-intrusive.
>
> Is there any interest in this? If so, I can create an issue and work on a
> pull request.
>
> Best regards,
>
> Randall Hauch




-- 
Liquan Pei
Software Engineer, Confluent Inc


Kafka Connect connector monitoring tasks via offset storage reader

2016-05-24 Thread Randall Hauch
I have a need for one of my SourceConnector implementations to configure a 
bunch of tasks and, when those are all “done”, request a task reconfiguration 
so that it can run a single task. Think: many tasks to make snapshot of 
database tables, then when those are completed reconfigure itself so that it 
then started _one_ task to read the transaction log.

Unfortunately, I can’t figure out a way for the connector to “monitor” the 
progress of its tasks, especially when those tasks are distributed across the 
cluster. The only way I can think of to get around this is to have my connector 
start *one* task that performs the snapshot and then starts reading the 
transaction log. Unfortunately, that means to parallelize the snapshotting 
work, the task would need to manage its own threads. That’s possible, but 
undesirable for many reasons, not the least of which is that the work can’t be 
distributed as multiple tasks amongst the cluster of Kafka Connect workers.

On the other hand, a simple enhancement to Kafka Connect would make this very 
easy: add to the ConnectorContext a method that returned the 
OffsetStorageReader. The connector could start a thread to periodically poll 
the offsets for various partitions, and effectively watch the progress of the 
tasks. Not only that, the connector’s 'taskConfigs(int)’ method could use the 
OffsetStorageReader to read previously-recorded offsets to more intelligently 
configure its tasks. This seems very straightforward, backward compatible, and 
non-intrusive.

Is there any interest in this? If so, I can create an issue and work on a pull 
request.

Best regards,

Randall Hauch