Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-09-10 Thread Guozhang Wang
Folks,

I would like to revive this thread on KIP-28: I have just updated the patch
rebased on latest trunk incorporating the feedbacks collected so far:

https://github.com/apache/kafka/pull/130

And the wiki page for this KIP has also been updated with the API and
architectural designs:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client

Would love to hear your thoughts or questions.

Guozhang


On Tue, Aug 11, 2015 at 10:50 AM, Guozhang Wang  wrote:

> Jiangjie,
>
> Thanks for the explanation, now I understands the scenario. It is one of
> the CEP in stream processing, in which I think the local state should be
> used for some sort of pattern matching. More concretely, let's say in this
> case we have a local state storing what have been observed. Then the
> sequence would be:
>
> T0: local state {}
> T1:message 0,  local state {0}
> T2:message 1,  local state {0, 1}
> T3:message 2,  local state {1}, matching 0 and 2, output some result
> and remove 0/2 from local state.
> T4:message 3,  local state {0}, matching 1 and 3, output some result
> and remove 1/3 from local state.
>
> Let's say user calls commit on T2, it will commit offset at message 2 as
> well as the local state {0, 1}; then upon failure recovery, it can recover
> the state as along with the committed offsets to continue.
>
> More generally, the current design of the processor will let users to
> specify their subscribed topics before starting the process, and users will
> not change topic subscription on the fly, users will not be committing on
> arbitrary offsets. The rationale behind this is to abstract the producer /
> consumer details from the processor developers as much as possible, i.e. if
> user do not want, they should not be exposed with message offsets /
> partition ids / topic names etc. For most cases, the subscribed topics
> should be able to specify before starting the processing job, so we let
> users to specify them once and then focus on the computational logic in
> implementing the process function.
>
> Guozhang
>
>
> On Tue, Aug 11, 2015 at 10:26 AM, Jiangjie Qin 
> wrote:
>
>> Guozhang,
>>
>> By interleaved groups of message, I meant something like this: Say we have
>> message 0,1,2,3, message 0 and 2 together completes a business logic,
>> message 1 and 3 together completes a business logic. In that case, after
>> user processed message 2, they cannot commit offsets because if they crash
>> before processing message 3, message 1 will not be reconsumed. That means
>> it is possible that user are not able to find a point where the current
>> state is safe to be committed.
>>
>> This is one example in the use case space table. It is still not clear to
>> me which use cases in the use case space table KIP-28 wants to cover. Are
>> we only covering the case for static topic stream with semi-auto commit?
>> i.e. user cannot change topic subscription on the fly and they can only
>> commit the current offset.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Mon, Aug 10, 2015 at 6:57 PM, Guozhang Wang 
>> wrote:
>>
>> > Hello folks,
>> >
>> > I have updated the KIP page with some detailed API / architecture /
>> > packaging proposals, along with the long promised first patch in PR:
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client
>> >
>> > https://github.com/apache/kafka/pull/130
>> >
>> >
>> > Any feedbacks / comments are more than welcomed.
>> >
>> > Guozhang
>> >
>> >
>> > On Mon, Aug 10, 2015 at 6:55 PM, Guozhang Wang 
>> wrote:
>> >
>> > > Hi Jun,
>> > >
>> > > 1. I have removed the streamTime in punctuate() since it is not only
>> > > triggered by clock time, detailed explanation can be found here:
>> > >
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamTime
>> > >
>> > > 2. Yes, if users do not schedule a task, then punctuate will never
>> fire.
>> > >
>> > > 3. Yes, I agree. The reason it was implemented in this way is that the
>> > > state store registration call is triggered by the users. However I
>> think
>> > it
>> > > is doable to change that API so that it will be more natural to have
>> sth.
>> > > like:
>> > >
>> > > context.createStore(store-name, store-type).
>> > >
>> > > Guozhang
>> > >
>> > > On Tue, Aug 4, 2015 at 9:17 AM, Jun Rao  wrote:
>> > >
>> > >> A few questions/comments.
>> > >>
>> > >> 1. What's streamTime passed to punctuate()? Is that just the current
>> > time?
>> > >> 2. Is punctuate() only called if schedule() is called?
>> > >> 3. The way the KeyValueStore is created seems a bit weird. Since
>> this is
>> > >> part of the internal state managed by KafkaProcessorContext, it seems
>> > >> there
>> > >> should be an api to create the KeyValueStore from
>> KafkaProcessorContext,

[DISCUSS] KIP-28 - Add a transform client for data processing

2015-08-19 Thread Yan Fang
Hi Guozhang,

Thank you for writing the KIP-28 up. (Hope this is the right thread for me to 
post some comments. :) 

I still have some confusing about the implementation of the Processor:

1. why do we maintain a separate consumer and producer for each worker thread?
— from my understanding, the new consumer api will be able to fetch certain 
topic-partition. Is one consumer enough for one Kafka.process (it is shared 
among work threads)? The same thing for the producer, is one producer enough 
for sending out messages to the brokers? Will this have better performance?

2. how is the “Stream Synchronization” achieved?
— you talked about “pause” and “notify” the consumer. Still not very clear. 
If worker thread has group_1 {topicA-0, topicB-0} and group_2 {topicA-1, 
topicB-1}, and topicB is much slower. How can we pause the consumer to sync 
topicA and topicB if there is only one consumer?

3. how does the partition timestamp monotonically increase?
— “When the lowest timestamp corresponding record gets processed by the 
thread, the partition time possibly gets advanced.” How does the “gets 
advanced” work? Do we get another “lowest message timestamp value”? But doing 
this, may not get an “advanced” timestamp.

4. thoughts about the local state management.
— from the description, I think there is one kv store per partition-group. 
That means if one work thread is assigned more than one partition groups, it 
will have more than one kv-store connections. How can we avoid mis-operation? 
Because one partition group can easily write to another partition group’s kv 
store (they are in the same thread). 

5. do we plan to implement the throttle ?
— since we are “forwarding” the messages. It is very possible that, 
upstream-processor is much faster than the downstream-processor, how do we plan 
to deal with this?

6. how does the parallelism work?
— we achieve this by simply adding more threads? Or we plan to have the 
mechanism which can deploy different threads to different machines? It is easy 
to image that we can deploy different processors to different machines, then 
how about the work threads? Then how is the fault-tolerance? Maybe this is 
out-of-scope of the KIP?

Two nits in the KIP-28 doc:

1. miss the “close” method interfaceProcessorK1,V1,K2,V2. We have the 
“override close()” in KafkaProcessor.

2. “punctuate” does not accept “parameter”, while StatefulProcessJob has a 
punctuate method that accepts parameter.

Thanks,
Yan

[DISCUSS] KIP-28 - Add a transform client for data processing

2015-08-19 Thread Yan Fang
Hi Guozhang,

Thank you for writing the KIP-28 up. (Hope this is the right thread for me to 
post some comments. :) 

I still have some confusing about the implementation of the Processor:

1. why do we maintain a separate consumer and producer for each worker thread?
— from my understanding, the new consumer api will be able to fetch certain 
topic-partition. Is one consumer enough for one Kafka.process (it is shared 
among work threads)? The same thing for the producer, is one producer enough 
for sending out messages to the brokers? Will this have better performance?

2. how is the “Stream Synchronization” achieved?
— you talked about “pause” and “notify” the consumer. Still not very clear. 
If worker thread has group_1 {topicA-0, topicB-0} and group_2 {topicA-1, 
topicB-1}, and topicB is much slower. How can we pause the consumer to sync 
topicA and topicB if there is only one consumer?

3. how does the partition timestamp monotonically increase?
— “When the lowest timestamp corresponding record gets processed by the 
thread, the partition time possibly gets advanced.” How does the “gets 
advanced” work? Do we get another “lowest message timestamp value”? But doing 
this, may not get an “advanced” timestamp.

4. thoughts about the local state management.
— from the description, I think there is one kv store per partition-group. 
That means if one work thread is assigned more than one partition groups, it 
will have more than one kv-store connections. How can we avoid mis-operation? 
Because one partition group can easily write to another partition group’s kv 
store (they are in the same thread). 

5. do we plan to implement the throttle ?
— since we are “forwarding” the messages. It is very possible that, 
upstream-processor is much faster than the downstream-processor, how do we plan 
to deal with this?

6. how does the parallelism work?
— we achieve this by simply adding more threads? Or we plan to have the 
mechanism which can deploy different threads to different machines? It is easy 
to image that we can deploy different processors to different machines, then 
how about the work threads? Then how is the fault-tolerance? Maybe this is 
out-of-scope of the KIP?

Two nits in the KIP-28 doc:

1. miss the “close” method interfaceProcessorK1,V1,K2,V2. We have the 
“override close()” in KafkaProcessor.

2. “punctuate” does not accept “parameter”, while StatefulProcessJob has a 
punctuate method that accepts parameter.

Thanks,
Yan



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-08-11 Thread Guozhang Wang
Jiangjie,

Thanks for the explanation, now I understands the scenario. It is one of
the CEP in stream processing, in which I think the local state should be
used for some sort of pattern matching. More concretely, let's say in this
case we have a local state storing what have been observed. Then the
sequence would be:

T0: local state {}
T1:message 0,  local state {0}
T2:message 1,  local state {0, 1}
T3:message 2,  local state {1}, matching 0 and 2, output some result
and remove 0/2 from local state.
T4:message 3,  local state {0}, matching 1 and 3, output some result
and remove 1/3 from local state.

Let's say user calls commit on T2, it will commit offset at message 2 as
well as the local state {0, 1}; then upon failure recovery, it can recover
the state as along with the committed offsets to continue.

More generally, the current design of the processor will let users to
specify their subscribed topics before starting the process, and users will
not change topic subscription on the fly, users will not be committing on
arbitrary offsets. The rationale behind this is to abstract the producer /
consumer details from the processor developers as much as possible, i.e. if
user do not want, they should not be exposed with message offsets /
partition ids / topic names etc. For most cases, the subscribed topics
should be able to specify before starting the processing job, so we let
users to specify them once and then focus on the computational logic in
implementing the process function.

Guozhang


On Tue, Aug 11, 2015 at 10:26 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Guozhang,

 By interleaved groups of message, I meant something like this: Say we have
 message 0,1,2,3, message 0 and 2 together completes a business logic,
 message 1 and 3 together completes a business logic. In that case, after
 user processed message 2, they cannot commit offsets because if they crash
 before processing message 3, message 1 will not be reconsumed. That means
 it is possible that user are not able to find a point where the current
 state is safe to be committed.

 This is one example in the use case space table. It is still not clear to
 me which use cases in the use case space table KIP-28 wants to cover. Are
 we only covering the case for static topic stream with semi-auto commit?
 i.e. user cannot change topic subscription on the fly and they can only
 commit the current offset.

 Thanks,

 Jiangjie (Becket) Qin

 On Mon, Aug 10, 2015 at 6:57 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hello folks,
 
  I have updated the KIP page with some detailed API / architecture /
  packaging proposals, along with the long promised first patch in PR:
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client
 
  https://github.com/apache/kafka/pull/130
 
 
  Any feedbacks / comments are more than welcomed.
 
  Guozhang
 
 
  On Mon, Aug 10, 2015 at 6:55 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi Jun,
  
   1. I have removed the streamTime in punctuate() since it is not only
   triggered by clock time, detailed explanation can be found here:
  
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamTime
  
   2. Yes, if users do not schedule a task, then punctuate will never
 fire.
  
   3. Yes, I agree. The reason it was implemented in this way is that the
   state store registration call is triggered by the users. However I
 think
  it
   is doable to change that API so that it will be more natural to have
 sth.
   like:
  
   context.createStore(store-name, store-type).
  
   Guozhang
  
   On Tue, Aug 4, 2015 at 9:17 AM, Jun Rao j...@confluent.io wrote:
  
   A few questions/comments.
  
   1. What's streamTime passed to punctuate()? Is that just the current
  time?
   2. Is punctuate() only called if schedule() is called?
   3. The way the KeyValueStore is created seems a bit weird. Since this
 is
   part of the internal state managed by KafkaProcessorContext, it seems
   there
   should be an api to create the KeyValueStore from
 KafkaProcessorContext,
   instead of passing context to the constructor of KeyValueStore?
  
   Thanks,
  
   Jun
  
   On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
   wrote:
  
Hi all,
   
I just posted KIP-28: Add a transform client for data processing

   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing

.
   
The wiki page does not yet have the full design / implementation
   details,
and this email is to kick-off the conversation on whether we should
  add
this new client with the described motivations, and if yes what
   features /
functionalities should be included.
   
Looking forward to your feedback!
   
-- Guozhang
   
  
  
  
  
   --
   -- Guozhang
  
 
 
 
  --
  -- Guozhang
 




-- 
-- Guozhang


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-08-11 Thread Jiangjie Qin
Guozhang,

By interleaved groups of message, I meant something like this: Say we have
message 0,1,2,3, message 0 and 2 together completes a business logic,
message 1 and 3 together completes a business logic. In that case, after
user processed message 2, they cannot commit offsets because if they crash
before processing message 3, message 1 will not be reconsumed. That means
it is possible that user are not able to find a point where the current
state is safe to be committed.

This is one example in the use case space table. It is still not clear to
me which use cases in the use case space table KIP-28 wants to cover. Are
we only covering the case for static topic stream with semi-auto commit?
i.e. user cannot change topic subscription on the fly and they can only
commit the current offset.

Thanks,

Jiangjie (Becket) Qin

On Mon, Aug 10, 2015 at 6:57 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello folks,

 I have updated the KIP page with some detailed API / architecture /
 packaging proposals, along with the long promised first patch in PR:


 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client

 https://github.com/apache/kafka/pull/130


 Any feedbacks / comments are more than welcomed.

 Guozhang


 On Mon, Aug 10, 2015 at 6:55 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi Jun,
 
  1. I have removed the streamTime in punctuate() since it is not only
  triggered by clock time, detailed explanation can be found here:
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamTime
 
  2. Yes, if users do not schedule a task, then punctuate will never fire.
 
  3. Yes, I agree. The reason it was implemented in this way is that the
  state store registration call is triggered by the users. However I think
 it
  is doable to change that API so that it will be more natural to have sth.
  like:
 
  context.createStore(store-name, store-type).
 
  Guozhang
 
  On Tue, Aug 4, 2015 at 9:17 AM, Jun Rao j...@confluent.io wrote:
 
  A few questions/comments.
 
  1. What's streamTime passed to punctuate()? Is that just the current
 time?
  2. Is punctuate() only called if schedule() is called?
  3. The way the KeyValueStore is created seems a bit weird. Since this is
  part of the internal state managed by KafkaProcessorContext, it seems
  there
  should be an api to create the KeyValueStore from KafkaProcessorContext,
  instead of passing context to the constructor of KeyValueStore?
 
  Thanks,
 
  Jun
 
  On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
  wrote:
 
   Hi all,
  
   I just posted KIP-28: Add a transform client for data processing
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
   
   .
  
   The wiki page does not yet have the full design / implementation
  details,
   and this email is to kick-off the conversation on whether we should
 add
   this new client with the described motivations, and if yes what
  features /
   functionalities should be included.
  
   Looking forward to your feedback!
  
   -- Guozhang
  
 
 
 
 
  --
  -- Guozhang
 



 --
 -- Guozhang



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-08-10 Thread Guozhang Wang
Hello folks,

I have updated the KIP page with some detailed API / architecture /
packaging proposals, along with the long promised first patch in PR:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client

https://github.com/apache/kafka/pull/130


Any feedbacks / comments are more than welcomed.

Guozhang


On Mon, Aug 10, 2015 at 6:55 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Jun,

 1. I have removed the streamTime in punctuate() since it is not only
 triggered by clock time, detailed explanation can be found here:


 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamTime

 2. Yes, if users do not schedule a task, then punctuate will never fire.

 3. Yes, I agree. The reason it was implemented in this way is that the
 state store registration call is triggered by the users. However I think it
 is doable to change that API so that it will be more natural to have sth.
 like:

 context.createStore(store-name, store-type).

 Guozhang

 On Tue, Aug 4, 2015 at 9:17 AM, Jun Rao j...@confluent.io wrote:

 A few questions/comments.

 1. What's streamTime passed to punctuate()? Is that just the current time?
 2. Is punctuate() only called if schedule() is called?
 3. The way the KeyValueStore is created seems a bit weird. Since this is
 part of the internal state managed by KafkaProcessorContext, it seems
 there
 should be an api to create the KeyValueStore from KafkaProcessorContext,
 instead of passing context to the constructor of KeyValueStore?

 Thanks,

 Jun

 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
 wrote:

  Hi all,
 
  I just posted KIP-28: Add a transform client for data processing
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
  
  .
 
  The wiki page does not yet have the full design / implementation
 details,
  and this email is to kick-off the conversation on whether we should add
  this new client with the described motivations, and if yes what
 features /
  functionalities should be included.
 
  Looking forward to your feedback!
 
  -- Guozhang
 




 --
 -- Guozhang




-- 
-- Guozhang


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-08-10 Thread Guozhang Wang
Hi Jiangjie,

Not sure I understand the What If user have interleaved groups of messages,
each group makes a complete logic? Could you elaborate a bit?

About the committing functionality, it currently will only commit up to the
processed message's offset; the commit() call it self actually does more
than consumer committing offsets, but together with flushing the local
state and the producer.

Guozhang

On Fri, Jul 31, 2015 at 9:20 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I think the abstraction of processor would be useful. It is not quite clear
 to me yet though which grid in the following API analysis chart this
 processor is trying to satisfy.


 https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal

 For example, in current proposal. It looks user will only be able to commit
 offsets for the last seen message. What If user have interleaved groups of
 messages, each group makes a complete logic? In that case, user will not
 have a safe boundary to commit offset.


 Is the processor client only intended to address the static topic data
 stream with semi-auto offset commit (which means user can only commit the
 last seen message)?

 Jiangjie (Becket) Qin

 On Thu, Jul 30, 2015 at 2:32 PM, James Cheng jch...@tivo.com wrote:

  I agree with Sriram and Martin. Kafka is already about providing streams
  of data, and so Kafka Streams or anything like that is confusing to me.
 
  This new library is about making it easier to process the data.
 
  -James
 
  On Jul 30, 2015, at 9:38 AM, Aditya Auradkar
  aaurad...@linkedin.com.INVALID wrote:
 
   Personally, I prefer KafkaStreams just because it sounds nicer. For the
   reasons identified above, KafkaProcessor or KProcessor is more apt but
   sounds less catchy (IMO). I also think we should prefix with Kafka
  (rather
   than K) because we will then have 3 clients: KafkaProducer,
 KafkaConsumer
   and KafkaProcessor which is very nice and consistent.
  
   Aditya
  
   On Thu, Jul 30, 2015 at 9:17 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
   I think its also a matter of intent. If we see it as yet another
   client library, than Processor (to match Producer and Consumer) will
   work great.
   If we see it is a stream processing framework, the name has to start
   with S to follow existing convention.
  
   Speaking of naming conventions:
   You know how people have stack names for technologies that are usually
   used in tandem? ELK, LAMP, etc.
   The pattern of Kafka - Stream Processor - NoSQL Store is super
   common. KSN stack doesn't sound right, though. Maybe while we are
   bikeshedding, someone has ideas in that direction :)
  
   On Thu, Jul 30, 2015 at 2:01 AM, Sriram Subramanian
   srsubraman...@linkedin.com.invalid wrote:
   I had the same thought. Kafka processor, KProcessor or even Kafka
   stream processor is more relevant.
  
  
  
   On Jul 30, 2015, at 2:09 PM, Martin Kleppmann mar...@kleppmann.com
 
   wrote:
  
   I'm with Sriram -- Kafka is all about streams already (or topics, to
  be
   precise, but we're calling it stream processing not topic
  processing),
   so I find Kafka Streams, KStream and Kafka Streaming all
  confusing,
   since they seem to imply that other bits of Kafka are not about
 streams.
  
   I would prefer The Processor API or Kafka Processors or Kafka
   Processing Client or KProcessor, or something along those lines.
  
   On 30 Jul 2015, at 15:07, Guozhang Wang wangg...@gmail.com
 wrote:
  
   I would vote for KStream as it sounds sexier (is it only me??),
  second
   to
   that would be Kafka Streaming.
  
   On Wed, Jul 29, 2015 at 6:08 PM, Jay Kreps j...@confluent.io
  wrote:
  
   Also, the most important part of any prototype, we should have a
  name
   for
   this producing-consumer-thingamgigy:
  
   Various ideas:
   - Kafka Streams
   - KStream
   - Kafka Streaming
   - The Processor API
   - Metamorphosis
   - Transformer API
   - Verwandlung
  
   For my part I think what people are trying to do is stream
  processing
   with
   Kafka so I think something that evokes Kafka and stream processing
  is
   preferable. I like Kafka Streams or Kafka Streaming followed by
   KStream.
  
   Transformer kind of makes me think of the shape-shifting cars.
  
   Metamorphosis is cool and hilarious but since we are kind of
   envisioning
   this as more limited scope thing rather than a massive framework
 in
   its own
   right I actually think it should have a descriptive name rather
  than a
   personality of it's own.
  
   Anyhow let the bikeshedding commence.
  
   -Jay
  
  
   On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang 
 wangg...@gmail.com
  
   wrote:
  
   Hi all,
  
   I just posted KIP-28: Add a transform client for data processing
   
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
   .
  
   The wiki page does not yet have the full design / implementation
   details,
   and this email is 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-08-10 Thread Guozhang Wang
Hi Jun,

1. I have removed the streamTime in punctuate() since it is not only
triggered by clock time, detailed explanation can be found here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamTime

2. Yes, if users do not schedule a task, then punctuate will never fire.

3. Yes, I agree. The reason it was implemented in this way is that the
state store registration call is triggered by the users. However I think it
is doable to change that API so that it will be more natural to have sth.
like:

context.createStore(store-name, store-type).

Guozhang

On Tue, Aug 4, 2015 at 9:17 AM, Jun Rao j...@confluent.io wrote:

 A few questions/comments.

 1. What's streamTime passed to punctuate()? Is that just the current time?
 2. Is punctuate() only called if schedule() is called?
 3. The way the KeyValueStore is created seems a bit weird. Since this is
 part of the internal state managed by KafkaProcessorContext, it seems there
 should be an api to create the KeyValueStore from KafkaProcessorContext,
 instead of passing context to the constructor of KeyValueStore?

 Thanks,

 Jun

 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi all,
 
  I just posted KIP-28: Add a transform client for data processing
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
  
  .
 
  The wiki page does not yet have the full design / implementation details,
  and this email is to kick-off the conversation on whether we should add
  this new client with the described motivations, and if yes what features
 /
  functionalities should be included.
 
  Looking forward to your feedback!
 
  -- Guozhang
 




-- 
-- Guozhang


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-08-04 Thread Jun Rao
A few questions/comments.

1. What's streamTime passed to punctuate()? Is that just the current time?
2. Is punctuate() only called if schedule() is called?
3. The way the KeyValueStore is created seems a bit weird. Since this is
part of the internal state managed by KafkaProcessorContext, it seems there
should be an api to create the KeyValueStore from KafkaProcessorContext,
instead of passing context to the constructor of KeyValueStore?

Thanks,

Jun

On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi all,

 I just posted KIP-28: Add a transform client for data processing
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
 
 .

 The wiki page does not yet have the full design / implementation details,
 and this email is to kick-off the conversation on whether we should add
 this new client with the described motivations, and if yes what features /
 functionalities should be included.

 Looking forward to your feedback!

 -- Guozhang



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-31 Thread Jiangjie Qin
I think the abstraction of processor would be useful. It is not quite clear
to me yet though which grid in the following API analysis chart this
processor is trying to satisfy.

https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal

For example, in current proposal. It looks user will only be able to commit
offsets for the last seen message. What If user have interleaved groups of
messages, each group makes a complete logic? In that case, user will not
have a safe boundary to commit offset.


Is the processor client only intended to address the static topic data
stream with semi-auto offset commit (which means user can only commit the
last seen message)?

Jiangjie (Becket) Qin

On Thu, Jul 30, 2015 at 2:32 PM, James Cheng jch...@tivo.com wrote:

 I agree with Sriram and Martin. Kafka is already about providing streams
 of data, and so Kafka Streams or anything like that is confusing to me.

 This new library is about making it easier to process the data.

 -James

 On Jul 30, 2015, at 9:38 AM, Aditya Auradkar
 aaurad...@linkedin.com.INVALID wrote:

  Personally, I prefer KafkaStreams just because it sounds nicer. For the
  reasons identified above, KafkaProcessor or KProcessor is more apt but
  sounds less catchy (IMO). I also think we should prefix with Kafka
 (rather
  than K) because we will then have 3 clients: KafkaProducer, KafkaConsumer
  and KafkaProcessor which is very nice and consistent.
 
  Aditya
 
  On Thu, Jul 30, 2015 at 9:17 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  I think its also a matter of intent. If we see it as yet another
  client library, than Processor (to match Producer and Consumer) will
  work great.
  If we see it is a stream processing framework, the name has to start
  with S to follow existing convention.
 
  Speaking of naming conventions:
  You know how people have stack names for technologies that are usually
  used in tandem? ELK, LAMP, etc.
  The pattern of Kafka - Stream Processor - NoSQL Store is super
  common. KSN stack doesn't sound right, though. Maybe while we are
  bikeshedding, someone has ideas in that direction :)
 
  On Thu, Jul 30, 2015 at 2:01 AM, Sriram Subramanian
  srsubraman...@linkedin.com.invalid wrote:
  I had the same thought. Kafka processor, KProcessor or even Kafka
  stream processor is more relevant.
 
 
 
  On Jul 30, 2015, at 2:09 PM, Martin Kleppmann mar...@kleppmann.com
  wrote:
 
  I'm with Sriram -- Kafka is all about streams already (or topics, to
 be
  precise, but we're calling it stream processing not topic
 processing),
  so I find Kafka Streams, KStream and Kafka Streaming all
 confusing,
  since they seem to imply that other bits of Kafka are not about streams.
 
  I would prefer The Processor API or Kafka Processors or Kafka
  Processing Client or KProcessor, or something along those lines.
 
  On 30 Jul 2015, at 15:07, Guozhang Wang wangg...@gmail.com wrote:
 
  I would vote for KStream as it sounds sexier (is it only me??),
 second
  to
  that would be Kafka Streaming.
 
  On Wed, Jul 29, 2015 at 6:08 PM, Jay Kreps j...@confluent.io
 wrote:
 
  Also, the most important part of any prototype, we should have a
 name
  for
  this producing-consumer-thingamgigy:
 
  Various ideas:
  - Kafka Streams
  - KStream
  - Kafka Streaming
  - The Processor API
  - Metamorphosis
  - Transformer API
  - Verwandlung
 
  For my part I think what people are trying to do is stream
 processing
  with
  Kafka so I think something that evokes Kafka and stream processing
 is
  preferable. I like Kafka Streams or Kafka Streaming followed by
  KStream.
 
  Transformer kind of makes me think of the shape-shifting cars.
 
  Metamorphosis is cool and hilarious but since we are kind of
  envisioning
  this as more limited scope thing rather than a massive framework in
  its own
  right I actually think it should have a descriptive name rather
 than a
  personality of it's own.
 
  Anyhow let the bikeshedding commence.
 
  -Jay
 
 
  On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
 
  wrote:
 
  Hi all,
 
  I just posted KIP-28: Add a transform client for data processing
  
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
  .
 
  The wiki page does not yet have the full design / implementation
  details,
  and this email is to kick-off the conversation on whether we should
  add
  this new client with the described motivations, and if yes what
  features
  /
  functionalities should be included.
 
  Looking forward to your feedback!
 
  -- Guozhang
 
 
 
  --
  -- Guozhang
 
 




Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-31 Thread Gwen Shapira
Just a quick ping, that regardless of the name of the thing, I'm still
interested in answers to my questions :)



On Tue, Jul 28, 2015 at 3:07 PM, Gwen Shapira gshap...@cloudera.com wrote:
 Thanks Guazhang! Much clearer now, at least for me.

 Few comments / questions:

 1. Perhaps punctuate(int numRecords) will be a nice API addition, some
 use-cases have record-count based windows, rather than time-based..
 2. The diagram for Flexible partition distribution shows two joins.
 Is the idea to implement two Processors and string them together?
 3.  Is the local state persistent? Can you talk a bit about how local
 state works with high availability?

 Gwen

 On Tue, Jul 28, 2015 at 12:57 AM, Guozhang Wang wangg...@gmail.com wrote:
 I have updated the wiki page incorporating people's comments, please feel
 free to take another look before today's meeting.

 On Mon, Jul 27, 2015 at 11:19 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Jay,

 {quote}
 1. Yeah we are going to try to generalize the partition management stuff.
 We'll get a wiki/JIRA up for that. I think that gives what you want in
 terms of moving partitioning to the client side.
 {quote}
 Great! I am looking forward to that.

 {quote}
 I think the key observation is that the whole reason
 LinkedIn split data over clusters to begin with was because of the lack of
 quotas, which are in any case getting implemented.
 {quote}
 I am not sure that I followed this point. Is your point that with quota, it
 is possible to host all data in a single cluster?

 -Yi

 On Mon, Jul 27, 2015 at 8:53 AM, Jay Kreps j...@confluent.io wrote:

  Hey Yi,
 
  Great points. I think for some of this the most useful thing would be to
  get a wip prototype out that we could discuss concretely. I think
 Yasuhiro
  and Guozhang took that prototype I had done, and had some improvements.
  Give us a bit to get that into understandable shape so we can discuss.
 
  To address a few of your other points:
  1. Yeah we are going to try to generalize the partition management stuff.
  We'll get a wiki/JIRA up for that. I think that gives what you want in
  terms of moving partitioning to the client side.
  2. I think consuming from a different cluster you produce to will be
 easy.
  More than that is more complex, though I agree the pluggable partitioning
  makes it theoretically possible. Let's try to get something that works
 for
  the first case, it sounds like that solves the use case you describe of
  wanting to directly transform from a given cluster but produce back to a
  different cluster. I think the key observation is that the whole reason
  LinkedIn split data over clusters to begin with was because of the lack
 of
  quotas, which are in any case getting implemented.
 
  -Jay
 
  On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan nickpa...@gmail.com wrote:
 
   Hi, Jay and all,
  
   Thanks for all your quick responses. I tried to summarize my thoughts
  here:
  
   - ConsumerRecord as stream processor API:
  
  * This KafkaProcessor API is targeted to receive the message from
  Kafka.
   So, to Yasuhiro's join/transformation example, any join/transformation
   results that are materialized in Kafka should have ConsumerRecord
 format
   (i.e. w/ topic and offsets). Any non-materialized join/transformation
   results should not be processed by this KafkaProcessor API. One example
  is
   the in-memory operators API in Samza, which is designed to handle the
   non-materialzied join/transformation results. And yes, in this case, a
  more
   abstract data model is needed.
  
  * Just to support Jay's point of a general
   ConsumerRecord/ProducerRecord, a general stream processing on more than
  one
   data sources would need at least the following info: data source
   description (i.e. which topic/table), and actual data (i.e. key-value
   pairs). It would make sense to have the data source name as part of the
   general metadata in stream processing (think about it as the table name
  for
   records in standard SQL).
  
   - SQL/DSL
  
  * I think that this topic itself is worthy of another KIP
 discussion.
  I
   would prefer to leave it out of scope in KIP-28.
  
   - Client-side pluggable partition manager
  
  * Given the use cases we have seen with large-scale deployment of
   Samza/Kafka in LinkedIn, I would argue that we should make it as the
   first-class citizen in this KIP. The use cases include:
  
 * multi-cluster Kafka
  
 * host-affinity (i.e. local-state associated w/ certain
 partitions
  on
   client)
  
   - Multi-cluster scenario
  
  * Although I originally just brought it up as a use case that
 requires
   client-side partition manager, reading Jay’s comments, I realized that
 I
   have one fundamental issue w/ the current copycat + transformation
 model.
   If I interpret Jay’s comment correctly, the proposed
  copycat+transformation
   plays out in the following way: i) copycat takes all data from sources
  (no
   matter it is 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-30 Thread James Cheng
I agree with Sriram and Martin. Kafka is already about providing streams of 
data, and so Kafka Streams or anything like that is confusing to me.

This new library is about making it easier to process the data.

-James

On Jul 30, 2015, at 9:38 AM, Aditya Auradkar aaurad...@linkedin.com.INVALID 
wrote:

 Personally, I prefer KafkaStreams just because it sounds nicer. For the
 reasons identified above, KafkaProcessor or KProcessor is more apt but
 sounds less catchy (IMO). I also think we should prefix with Kafka (rather
 than K) because we will then have 3 clients: KafkaProducer, KafkaConsumer
 and KafkaProcessor which is very nice and consistent.
 
 Aditya
 
 On Thu, Jul 30, 2015 at 9:17 AM, Gwen Shapira gshap...@cloudera.com wrote:
 
 I think its also a matter of intent. If we see it as yet another
 client library, than Processor (to match Producer and Consumer) will
 work great.
 If we see it is a stream processing framework, the name has to start
 with S to follow existing convention.
 
 Speaking of naming conventions:
 You know how people have stack names for technologies that are usually
 used in tandem? ELK, LAMP, etc.
 The pattern of Kafka - Stream Processor - NoSQL Store is super
 common. KSN stack doesn't sound right, though. Maybe while we are
 bikeshedding, someone has ideas in that direction :)
 
 On Thu, Jul 30, 2015 at 2:01 AM, Sriram Subramanian
 srsubraman...@linkedin.com.invalid wrote:
 I had the same thought. Kafka processor, KProcessor or even Kafka
 stream processor is more relevant.
 
 
 
 On Jul 30, 2015, at 2:09 PM, Martin Kleppmann mar...@kleppmann.com
 wrote:
 
 I'm with Sriram -- Kafka is all about streams already (or topics, to be
 precise, but we're calling it stream processing not topic processing),
 so I find Kafka Streams, KStream and Kafka Streaming all confusing,
 since they seem to imply that other bits of Kafka are not about streams.
 
 I would prefer The Processor API or Kafka Processors or Kafka
 Processing Client or KProcessor, or something along those lines.
 
 On 30 Jul 2015, at 15:07, Guozhang Wang wangg...@gmail.com wrote:
 
 I would vote for KStream as it sounds sexier (is it only me??), second
 to
 that would be Kafka Streaming.
 
 On Wed, Jul 29, 2015 at 6:08 PM, Jay Kreps j...@confluent.io wrote:
 
 Also, the most important part of any prototype, we should have a name
 for
 this producing-consumer-thingamgigy:
 
 Various ideas:
 - Kafka Streams
 - KStream
 - Kafka Streaming
 - The Processor API
 - Metamorphosis
 - Transformer API
 - Verwandlung
 
 For my part I think what people are trying to do is stream processing
 with
 Kafka so I think something that evokes Kafka and stream processing is
 preferable. I like Kafka Streams or Kafka Streaming followed by
 KStream.
 
 Transformer kind of makes me think of the shape-shifting cars.
 
 Metamorphosis is cool and hilarious but since we are kind of
 envisioning
 this as more limited scope thing rather than a massive framework in
 its own
 right I actually think it should have a descriptive name rather than a
 personality of it's own.
 
 Anyhow let the bikeshedding commence.
 
 -Jay
 
 
 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
 Hi all,
 
 I just posted KIP-28: Add a transform client for data processing
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
 .
 
 The wiki page does not yet have the full design / implementation
 details,
 and this email is to kick-off the conversation on whether we should
 add
 this new client with the described motivations, and if yes what
 features
 /
 functionalities should be included.
 
 Looking forward to your feedback!
 
 -- Guozhang
 
 
 
 --
 -- Guozhang
 
 



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-30 Thread Martin Kleppmann
I'm with Sriram -- Kafka is all about streams already (or topics, to be 
precise, but we're calling it stream processing not topic processing), so I 
find Kafka Streams, KStream and Kafka Streaming all confusing, since they 
seem to imply that other bits of Kafka are not about streams.

I would prefer The Processor API or Kafka Processors or Kafka Processing 
Client or KProcessor, or something along those lines.

On 30 Jul 2015, at 15:07, Guozhang Wang wangg...@gmail.com wrote:

 I would vote for KStream as it sounds sexier (is it only me??), second to
 that would be Kafka Streaming.
 
 On Wed, Jul 29, 2015 at 6:08 PM, Jay Kreps j...@confluent.io wrote:
 
 Also, the most important part of any prototype, we should have a name for
 this producing-consumer-thingamgigy:
 
 Various ideas:
 - Kafka Streams
 - KStream
 - Kafka Streaming
 - The Processor API
 - Metamorphosis
 - Transformer API
 - Verwandlung
 
 For my part I think what people are trying to do is stream processing with
 Kafka so I think something that evokes Kafka and stream processing is
 preferable. I like Kafka Streams or Kafka Streaming followed by KStream.
 
 Transformer kind of makes me think of the shape-shifting cars.
 
 Metamorphosis is cool and hilarious but since we are kind of envisioning
 this as more limited scope thing rather than a massive framework in its own
 right I actually think it should have a descriptive name rather than a
 personality of it's own.
 
 Anyhow let the bikeshedding commence.
 
 -Jay
 
 
 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:
 
 Hi all,
 
 I just posted KIP-28: Add a transform client for data processing
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
 
 .
 
 The wiki page does not yet have the full design / implementation details,
 and this email is to kick-off the conversation on whether we should add
 this new client with the described motivations, and if yes what features
 /
 functionalities should be included.
 
 Looking forward to your feedback!
 
 -- Guozhang
 
 
 
 
 
 -- 
 -- Guozhang



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-30 Thread Aditya Auradkar
Personally, I prefer KafkaStreams just because it sounds nicer. For the
reasons identified above, KafkaProcessor or KProcessor is more apt but
sounds less catchy (IMO). I also think we should prefix with Kafka (rather
than K) because we will then have 3 clients: KafkaProducer, KafkaConsumer
and KafkaProcessor which is very nice and consistent.

Aditya

On Thu, Jul 30, 2015 at 9:17 AM, Gwen Shapira gshap...@cloudera.com wrote:

 I think its also a matter of intent. If we see it as yet another
 client library, than Processor (to match Producer and Consumer) will
 work great.
 If we see it is a stream processing framework, the name has to start
 with S to follow existing convention.

 Speaking of naming conventions:
 You know how people have stack names for technologies that are usually
 used in tandem? ELK, LAMP, etc.
 The pattern of Kafka - Stream Processor - NoSQL Store is super
 common. KSN stack doesn't sound right, though. Maybe while we are
 bikeshedding, someone has ideas in that direction :)

 On Thu, Jul 30, 2015 at 2:01 AM, Sriram Subramanian
 srsubraman...@linkedin.com.invalid wrote:
  I had the same thought. Kafka processor, KProcessor or even Kafka
  stream processor is more relevant.
 
 
 
  On Jul 30, 2015, at 2:09 PM, Martin Kleppmann mar...@kleppmann.com
 wrote:
 
  I'm with Sriram -- Kafka is all about streams already (or topics, to be
 precise, but we're calling it stream processing not topic processing),
 so I find Kafka Streams, KStream and Kafka Streaming all confusing,
 since they seem to imply that other bits of Kafka are not about streams.
 
  I would prefer The Processor API or Kafka Processors or Kafka
 Processing Client or KProcessor, or something along those lines.
 
  On 30 Jul 2015, at 15:07, Guozhang Wang wangg...@gmail.com wrote:
 
  I would vote for KStream as it sounds sexier (is it only me??), second
 to
  that would be Kafka Streaming.
 
  On Wed, Jul 29, 2015 at 6:08 PM, Jay Kreps j...@confluent.io wrote:
 
  Also, the most important part of any prototype, we should have a name
 for
  this producing-consumer-thingamgigy:
 
  Various ideas:
  - Kafka Streams
  - KStream
  - Kafka Streaming
  - The Processor API
  - Metamorphosis
  - Transformer API
  - Verwandlung
 
  For my part I think what people are trying to do is stream processing
 with
  Kafka so I think something that evokes Kafka and stream processing is
  preferable. I like Kafka Streams or Kafka Streaming followed by
 KStream.
 
  Transformer kind of makes me think of the shape-shifting cars.
 
  Metamorphosis is cool and hilarious but since we are kind of
 envisioning
  this as more limited scope thing rather than a massive framework in
 its own
  right I actually think it should have a descriptive name rather than a
  personality of it's own.
 
  Anyhow let the bikeshedding commence.
 
  -Jay
 
 
  On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
  Hi all,
 
  I just posted KIP-28: Add a transform client for data processing
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
  .
 
  The wiki page does not yet have the full design / implementation
 details,
  and this email is to kick-off the conversation on whether we should
 add
  this new client with the described motivations, and if yes what
 features
  /
  functionalities should be included.
 
  Looking forward to your feedback!
 
  -- Guozhang
 
 
 
  --
  -- Guozhang
 



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-30 Thread Guozhang Wang
I would vote for KStream as it sounds sexier (is it only me??), second to
that would be Kafka Streaming.

On Wed, Jul 29, 2015 at 6:08 PM, Jay Kreps j...@confluent.io wrote:

 Also, the most important part of any prototype, we should have a name for
 this producing-consumer-thingamgigy:

 Various ideas:
 - Kafka Streams
 - KStream
 - Kafka Streaming
 - The Processor API
 - Metamorphosis
 - Transformer API
 - Verwandlung

 For my part I think what people are trying to do is stream processing with
 Kafka so I think something that evokes Kafka and stream processing is
 preferable. I like Kafka Streams or Kafka Streaming followed by KStream.

 Transformer kind of makes me think of the shape-shifting cars.

 Metamorphosis is cool and hilarious but since we are kind of envisioning
 this as more limited scope thing rather than a massive framework in its own
 right I actually think it should have a descriptive name rather than a
 personality of it's own.

 Anyhow let the bikeshedding commence.

 -Jay


 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi all,
 
  I just posted KIP-28: Add a transform client for data processing
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
  
  .
 
  The wiki page does not yet have the full design / implementation details,
  and this email is to kick-off the conversation on whether we should add
  this new client with the described motivations, and if yes what features
 /
  functionalities should be included.
 
  Looking forward to your feedback!
 
  -- Guozhang
 




-- 
-- Guozhang


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-30 Thread Gwen Shapira
I think its also a matter of intent. If we see it as yet another
client library, than Processor (to match Producer and Consumer) will
work great.
If we see it is a stream processing framework, the name has to start
with S to follow existing convention.

Speaking of naming conventions:
You know how people have stack names for technologies that are usually
used in tandem? ELK, LAMP, etc.
The pattern of Kafka - Stream Processor - NoSQL Store is super
common. KSN stack doesn't sound right, though. Maybe while we are
bikeshedding, someone has ideas in that direction :)

On Thu, Jul 30, 2015 at 2:01 AM, Sriram Subramanian
srsubraman...@linkedin.com.invalid wrote:
 I had the same thought. Kafka processor, KProcessor or even Kafka
 stream processor is more relevant.



 On Jul 30, 2015, at 2:09 PM, Martin Kleppmann mar...@kleppmann.com wrote:

 I'm with Sriram -- Kafka is all about streams already (or topics, to be 
 precise, but we're calling it stream processing not topic processing), 
 so I find Kafka Streams, KStream and Kafka Streaming all confusing, 
 since they seem to imply that other bits of Kafka are not about streams.

 I would prefer The Processor API or Kafka Processors or Kafka 
 Processing Client or KProcessor, or something along those lines.

 On 30 Jul 2015, at 15:07, Guozhang Wang wangg...@gmail.com wrote:

 I would vote for KStream as it sounds sexier (is it only me??), second to
 that would be Kafka Streaming.

 On Wed, Jul 29, 2015 at 6:08 PM, Jay Kreps j...@confluent.io wrote:

 Also, the most important part of any prototype, we should have a name for
 this producing-consumer-thingamgigy:

 Various ideas:
 - Kafka Streams
 - KStream
 - Kafka Streaming
 - The Processor API
 - Metamorphosis
 - Transformer API
 - Verwandlung

 For my part I think what people are trying to do is stream processing with
 Kafka so I think something that evokes Kafka and stream processing is
 preferable. I like Kafka Streams or Kafka Streaming followed by KStream.

 Transformer kind of makes me think of the shape-shifting cars.

 Metamorphosis is cool and hilarious but since we are kind of envisioning
 this as more limited scope thing rather than a massive framework in its own
 right I actually think it should have a descriptive name rather than a
 personality of it's own.

 Anyhow let the bikeshedding commence.

 -Jay


 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi all,

 I just posted KIP-28: Add a transform client for data processing
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
 .

 The wiki page does not yet have the full design / implementation details,
 and this email is to kick-off the conversation on whether we should add
 this new client with the described motivations, and if yes what features
 /
 functionalities should be included.

 Looking forward to your feedback!

 -- Guozhang



 --
 -- Guozhang



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-30 Thread Sriram Subramanian
I had the same thought. Kafka processor, KProcessor or even Kafka
stream processor is more relevant.



 On Jul 30, 2015, at 2:09 PM, Martin Kleppmann mar...@kleppmann.com wrote:

 I'm with Sriram -- Kafka is all about streams already (or topics, to be 
 precise, but we're calling it stream processing not topic processing), so 
 I find Kafka Streams, KStream and Kafka Streaming all confusing, since 
 they seem to imply that other bits of Kafka are not about streams.

 I would prefer The Processor API or Kafka Processors or Kafka Processing 
 Client or KProcessor, or something along those lines.

 On 30 Jul 2015, at 15:07, Guozhang Wang wangg...@gmail.com wrote:

 I would vote for KStream as it sounds sexier (is it only me??), second to
 that would be Kafka Streaming.

 On Wed, Jul 29, 2015 at 6:08 PM, Jay Kreps j...@confluent.io wrote:

 Also, the most important part of any prototype, we should have a name for
 this producing-consumer-thingamgigy:

 Various ideas:
 - Kafka Streams
 - KStream
 - Kafka Streaming
 - The Processor API
 - Metamorphosis
 - Transformer API
 - Verwandlung

 For my part I think what people are trying to do is stream processing with
 Kafka so I think something that evokes Kafka and stream processing is
 preferable. I like Kafka Streams or Kafka Streaming followed by KStream.

 Transformer kind of makes me think of the shape-shifting cars.

 Metamorphosis is cool and hilarious but since we are kind of envisioning
 this as more limited scope thing rather than a massive framework in its own
 right I actually think it should have a descriptive name rather than a
 personality of it's own.

 Anyhow let the bikeshedding commence.

 -Jay


 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi all,

 I just posted KIP-28: Add a transform client for data processing
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
 .

 The wiki page does not yet have the full design / implementation details,
 and this email is to kick-off the conversation on whether we should add
 this new client with the described motivations, and if yes what features
 /
 functionalities should be included.

 Looking forward to your feedback!

 -- Guozhang



 --
 -- Guozhang



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-29 Thread Neha Narkhede
Prefer something that evokes stream processing on top of Kafka. And since
I've heard many people conflate streaming with streaming video (I know,
duh!), I'd vote for Kafka Streams or a maybe KStream.

Thanks,
Neha

On Wed, Jul 29, 2015 at 6:08 PM, Jay Kreps j...@confluent.io wrote:

 Also, the most important part of any prototype, we should have a name for
 this producing-consumer-thingamgigy:

 Various ideas:
 - Kafka Streams
 - KStream
 - Kafka Streaming
 - The Processor API
 - Metamorphosis
 - Transformer API
 - Verwandlung

 For my part I think what people are trying to do is stream processing with
 Kafka so I think something that evokes Kafka and stream processing is
 preferable. I like Kafka Streams or Kafka Streaming followed by KStream.

 Transformer kind of makes me think of the shape-shifting cars.

 Metamorphosis is cool and hilarious but since we are kind of envisioning
 this as more limited scope thing rather than a massive framework in its own
 right I actually think it should have a descriptive name rather than a
 personality of it's own.

 Anyhow let the bikeshedding commence.

 -Jay


 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi all,
 
  I just posted KIP-28: Add a transform client for data processing
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
  
  .
 
  The wiki page does not yet have the full design / implementation details,
  and this email is to kick-off the conversation on whether we should add
  this new client with the described motivations, and if yes what features
 /
  functionalities should be included.
 
  Looking forward to your feedback!
 
  -- Guozhang
 




-- 
Thanks,
Neha


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-29 Thread Gwen Shapira
Since it sounds like it is not a separate framework (like CopyCat) but
rather a new client, it will be nice to follow existing convention.
Producer, Consumer and Processor (or Transformer) make sense to me.

Note that the way the API is currently described, people may want to
use it inside Spark applications (because everyone wants to use
Spark), so putting Streaming in the name will cause some confusion.

Metamorphosis and Verwandlung are too perfect to argue against, but
I'd rather keep my slide decks insect-free :)
Since I like fluffy animals on my slides, how about Bunny? This way
chaining multiple processors into a pipeline can be described as Bunny
hops. Also, together with CopyCat, we have pipelines that look like
this: 
http://www.catster.com/wp-content/uploads/2015/06/bd58b829434657a44533a33c32772c36.jpg

Gwen


On Wed, Jul 29, 2015 at 6:46 PM, Sriram Subramanian
srsubraman...@linkedin.com.invalid wrote:
 I think Kafka and streaming are synonymous. Kafka streams or Kafka
 streaming really does not indicate stream processing.

 On Wed, Jul 29, 2015 at 6:20 PM, Neha Narkhede n...@confluent.io wrote:

 Prefer something that evokes stream processing on top of Kafka. And since
 I've heard many people conflate streaming with streaming video (I know,
 duh!), I'd vote for Kafka Streams or a maybe KStream.

 Thanks,
 Neha

 On Wed, Jul 29, 2015 at 6:08 PM, Jay Kreps j...@confluent.io wrote:

  Also, the most important part of any prototype, we should have a name for
  this producing-consumer-thingamgigy:
 
  Various ideas:
  - Kafka Streams
  - KStream
  - Kafka Streaming
  - The Processor API
  - Metamorphosis
  - Transformer API
  - Verwandlung
 
  For my part I think what people are trying to do is stream processing
 with
  Kafka so I think something that evokes Kafka and stream processing is
  preferable. I like Kafka Streams or Kafka Streaming followed by KStream.
 
  Transformer kind of makes me think of the shape-shifting cars.
 
  Metamorphosis is cool and hilarious but since we are kind of envisioning
  this as more limited scope thing rather than a massive framework in its
 own
  right I actually think it should have a descriptive name rather than a
  personality of it's own.
 
  Anyhow let the bikeshedding commence.
 
  -Jay
 
 
  On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi all,
  
   I just posted KIP-28: Add a transform client for data processing
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
   
   .
  
   The wiki page does not yet have the full design / implementation
 details,
   and this email is to kick-off the conversation on whether we should add
   this new client with the described motivations, and if yes what
 features
  /
   functionalities should be included.
  
   Looking forward to your feedback!
  
   -- Guozhang
  
 



 --
 Thanks,
 Neha



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-29 Thread Jay Kreps
Also, the most important part of any prototype, we should have a name for
this producing-consumer-thingamgigy:

Various ideas:
- Kafka Streams
- KStream
- Kafka Streaming
- The Processor API
- Metamorphosis
- Transformer API
- Verwandlung

For my part I think what people are trying to do is stream processing with
Kafka so I think something that evokes Kafka and stream processing is
preferable. I like Kafka Streams or Kafka Streaming followed by KStream.

Transformer kind of makes me think of the shape-shifting cars.

Metamorphosis is cool and hilarious but since we are kind of envisioning
this as more limited scope thing rather than a massive framework in its own
right I actually think it should have a descriptive name rather than a
personality of it's own.

Anyhow let the bikeshedding commence.

-Jay


On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi all,

 I just posted KIP-28: Add a transform client for data processing
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
 
 .

 The wiki page does not yet have the full design / implementation details,
 and this email is to kick-off the conversation on whether we should add
 this new client with the described motivations, and if yes what features /
 functionalities should be included.

 Looking forward to your feedback!

 -- Guozhang



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-28 Thread Guozhang Wang
I have updated the wiki page incorporating people's comments, please feel
free to take another look before today's meeting.

On Mon, Jul 27, 2015 at 11:19 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Jay,

 {quote}
 1. Yeah we are going to try to generalize the partition management stuff.
 We'll get a wiki/JIRA up for that. I think that gives what you want in
 terms of moving partitioning to the client side.
 {quote}
 Great! I am looking forward to that.

 {quote}
 I think the key observation is that the whole reason
 LinkedIn split data over clusters to begin with was because of the lack of
 quotas, which are in any case getting implemented.
 {quote}
 I am not sure that I followed this point. Is your point that with quota, it
 is possible to host all data in a single cluster?

 -Yi

 On Mon, Jul 27, 2015 at 8:53 AM, Jay Kreps j...@confluent.io wrote:

  Hey Yi,
 
  Great points. I think for some of this the most useful thing would be to
  get a wip prototype out that we could discuss concretely. I think
 Yasuhiro
  and Guozhang took that prototype I had done, and had some improvements.
  Give us a bit to get that into understandable shape so we can discuss.
 
  To address a few of your other points:
  1. Yeah we are going to try to generalize the partition management stuff.
  We'll get a wiki/JIRA up for that. I think that gives what you want in
  terms of moving partitioning to the client side.
  2. I think consuming from a different cluster you produce to will be
 easy.
  More than that is more complex, though I agree the pluggable partitioning
  makes it theoretically possible. Let's try to get something that works
 for
  the first case, it sounds like that solves the use case you describe of
  wanting to directly transform from a given cluster but produce back to a
  different cluster. I think the key observation is that the whole reason
  LinkedIn split data over clusters to begin with was because of the lack
 of
  quotas, which are in any case getting implemented.
 
  -Jay
 
  On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan nickpa...@gmail.com wrote:
 
   Hi, Jay and all,
  
   Thanks for all your quick responses. I tried to summarize my thoughts
  here:
  
   - ConsumerRecord as stream processor API:
  
  * This KafkaProcessor API is targeted to receive the message from
  Kafka.
   So, to Yasuhiro's join/transformation example, any join/transformation
   results that are materialized in Kafka should have ConsumerRecord
 format
   (i.e. w/ topic and offsets). Any non-materialized join/transformation
   results should not be processed by this KafkaProcessor API. One example
  is
   the in-memory operators API in Samza, which is designed to handle the
   non-materialzied join/transformation results. And yes, in this case, a
  more
   abstract data model is needed.
  
  * Just to support Jay's point of a general
   ConsumerRecord/ProducerRecord, a general stream processing on more than
  one
   data sources would need at least the following info: data source
   description (i.e. which topic/table), and actual data (i.e. key-value
   pairs). It would make sense to have the data source name as part of the
   general metadata in stream processing (think about it as the table name
  for
   records in standard SQL).
  
   - SQL/DSL
  
  * I think that this topic itself is worthy of another KIP
 discussion.
  I
   would prefer to leave it out of scope in KIP-28.
  
   - Client-side pluggable partition manager
  
  * Given the use cases we have seen with large-scale deployment of
   Samza/Kafka in LinkedIn, I would argue that we should make it as the
   first-class citizen in this KIP. The use cases include:
  
 * multi-cluster Kafka
  
 * host-affinity (i.e. local-state associated w/ certain
 partitions
  on
   client)
  
   - Multi-cluster scenario
  
  * Although I originally just brought it up as a use case that
 requires
   client-side partition manager, reading Jay’s comments, I realized that
 I
   have one fundamental issue w/ the current copycat + transformation
 model.
   If I interpret Jay’s comment correctly, the proposed
  copycat+transformation
   plays out in the following way: i) copycat takes all data from sources
  (no
   matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii)
   transformation is only restricted to take data sources in *this single
   Kafka cluster* to perform aggregate/join etc. This is different from my
   original understanding of the copycat. The main issue I have with this
   model is: huge data-copy between Kafka clusters. In LinkedIn, we used
 to
   follow this model that uses MirrorMaker to map topics from tracking
   clusters to Samza-specific Kafka cluster and only do stream processing
 in
   the Samza-specific Kafka cluster. We moved away from this model and
  started
   allowing users to directly consume from tracking Kafka clusters due to
  the
   overhead of copying huge amount of traffic between Kafka clusters. I
  

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-28 Thread Jay Kreps
The second question that came up on the KIP was how do joins and
aggregations work. A lot of implicit thinking went into Kafka's data model
to support stream processing so there is an idea of how this should work
but it isn't exactly obvious. Let me go through the idea of how a processor
is meant to do these kinds of operations using Kafka. This isn't the only
way to do these things but it's the one we thought about when making Kafka
originally.

The first primitive you need is co-partitioning. Both joins and
aggregations require getting data into the same place to do the join or do
the aggregating. This makes use of the partitioning model in Kafka. So
imagine you have two topics CLICKS and IMPRESSIONS, both partitioned by
user_id with the same number of partitions. Then the co-partitioning
partitioning strategy will delivery messages for the same partitions to the
same consumer instance. The new consumer supports this explicitly now with
pluggable partition assignment strategies, the old high level consumer
implicitly did this. One of the reasons we worked so hard on replication in
Kafka was so that the partitions didn't disappear when a server went down
so that this kind of co-partitioning is possible.

If data isn't already co-partitioned on the same key, you can repartition
it by publishing out a new topic keyed and partitioned by the new partition
key (a kind of continuous reshuffle).

Okay, great, so co-partitioning gets stuff to the same place, how do you
operate on it once it is there?

Joins can be either stream-to-stream joins where the streams are almost
aligned and you look for a matching click (say) for each impression. Or
they could be stream-to-table joins where you join user details to a click.
You can think of a stream join as a join done over a limited window (maybe
5 mins) and a table join as a join done over all time so far (an infinite
window).

Aggregates like counts or other summary stats are going to be done over a
window too.

Both aggregates and intermediate join results can be thought of as state
that is accumulated within a window. Either the state of the aggregate so
far (say counts by key), or the state of the joined and unjoined records so
far in the window.

This state is local to the processing so it has to be made fault-tolerant
if the processor fails. There are two ways to do this: (a) recreate it, (b)
have a backup. To recreate it you just recompute the state by reprocessing
the input from the beginning of the window upon failure and recovery. In
Kafka you accomplish this by controlling the offset commit until the window
is processed. For a small window this works well. For a large window
(especially an infinite window) you need to be able to save out your
aggregate or partial join result so that you don't have to reprocess too
much input. To make the local state fault-tolerant Kafka supports log
compacted topics to allow journalling these local changes.

So clearly a user can use the producer and consumer to manage all this
directly, but it is kind of low level. The goal of the
processor/transformer/streaming client is to provide a user-friendly facade
over these capabilities.

-Jay




On Tue, Jul 28, 2015 at 11:59 AM, Jay Kreps j...@confluent.io wrote:

 Here is the link to the original prototype we started with. I wouldn't
 focus to heavily on the details of this code or the api, but I think it
 gives the an idea of the lowest level api, amount of code, etc. It was
 basically a clone of Samza built on Kafka using the new consumer protocol
 just to explore the idea.

 https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming

 I don't think we should talk about it too much because I think Guozhang
 and Yasu rewrote that, improved the apis, etc. But I think this does give a
 kind of proof that it is possible to make a relatively complete stream
 processing system that is only a few thousand lines of code that heavily
 embraces Kafka.

 -Jay

 On Tue, Jul 28, 2015 at 12:57 AM, Guozhang Wang wangg...@gmail.com
 wrote:

 I have updated the wiki page incorporating people's comments, please feel
 free to take another look before today's meeting.

 On Mon, Jul 27, 2015 at 11:19 PM, Yi Pan nickpa...@gmail.com wrote:

  Hi, Jay,
 
  {quote}
  1. Yeah we are going to try to generalize the partition management
 stuff.
  We'll get a wiki/JIRA up for that. I think that gives what you want in
  terms of moving partitioning to the client side.
  {quote}
  Great! I am looking forward to that.
 
  {quote}
  I think the key observation is that the whole reason
  LinkedIn split data over clusters to begin with was because of the lack
 of
  quotas, which are in any case getting implemented.
  {quote}
  I am not sure that I followed this point. Is your point that with
 quota, it
  is possible to host all data in a single cluster?
 
  -Yi
 
  On Mon, Jul 27, 2015 at 8:53 AM, Jay Kreps j...@confluent.io wrote:
 
   Hey Yi,
  
   Great points. I 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-28 Thread Jay Kreps
Here is the link to the original prototype we started with. I wouldn't
focus to heavily on the details of this code or the api, but I think it
gives the an idea of the lowest level api, amount of code, etc. It was
basically a clone of Samza built on Kafka using the new consumer protocol
just to explore the idea.
https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming

I don't think we should talk about it too much because I think Guozhang and
Yasu rewrote that, improved the apis, etc. But I think this does give a
kind of proof that it is possible to make a relatively complete stream
processing system that is only a few thousand lines of code that heavily
embraces Kafka.

-Jay

On Tue, Jul 28, 2015 at 12:57 AM, Guozhang Wang wangg...@gmail.com wrote:

 I have updated the wiki page incorporating people's comments, please feel
 free to take another look before today's meeting.

 On Mon, Jul 27, 2015 at 11:19 PM, Yi Pan nickpa...@gmail.com wrote:

  Hi, Jay,
 
  {quote}
  1. Yeah we are going to try to generalize the partition management stuff.
  We'll get a wiki/JIRA up for that. I think that gives what you want in
  terms of moving partitioning to the client side.
  {quote}
  Great! I am looking forward to that.
 
  {quote}
  I think the key observation is that the whole reason
  LinkedIn split data over clusters to begin with was because of the lack
 of
  quotas, which are in any case getting implemented.
  {quote}
  I am not sure that I followed this point. Is your point that with quota,
 it
  is possible to host all data in a single cluster?
 
  -Yi
 
  On Mon, Jul 27, 2015 at 8:53 AM, Jay Kreps j...@confluent.io wrote:
 
   Hey Yi,
  
   Great points. I think for some of this the most useful thing would be
 to
   get a wip prototype out that we could discuss concretely. I think
  Yasuhiro
   and Guozhang took that prototype I had done, and had some improvements.
   Give us a bit to get that into understandable shape so we can discuss.
  
   To address a few of your other points:
   1. Yeah we are going to try to generalize the partition management
 stuff.
   We'll get a wiki/JIRA up for that. I think that gives what you want in
   terms of moving partitioning to the client side.
   2. I think consuming from a different cluster you produce to will be
  easy.
   More than that is more complex, though I agree the pluggable
 partitioning
   makes it theoretically possible. Let's try to get something that works
  for
   the first case, it sounds like that solves the use case you describe of
   wanting to directly transform from a given cluster but produce back to
 a
   different cluster. I think the key observation is that the whole reason
   LinkedIn split data over clusters to begin with was because of the lack
  of
   quotas, which are in any case getting implemented.
  
   -Jay
  
   On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan nickpa...@gmail.com wrote:
  
Hi, Jay and all,
   
Thanks for all your quick responses. I tried to summarize my thoughts
   here:
   
- ConsumerRecord as stream processor API:
   
   * This KafkaProcessor API is targeted to receive the message from
   Kafka.
So, to Yasuhiro's join/transformation example, any
 join/transformation
results that are materialized in Kafka should have ConsumerRecord
  format
(i.e. w/ topic and offsets). Any non-materialized join/transformation
results should not be processed by this KafkaProcessor API. One
 example
   is
the in-memory operators API in Samza, which is designed to handle the
non-materialzied join/transformation results. And yes, in this case,
 a
   more
abstract data model is needed.
   
   * Just to support Jay's point of a general
ConsumerRecord/ProducerRecord, a general stream processing on more
 than
   one
data sources would need at least the following info: data source
description (i.e. which topic/table), and actual data (i.e. key-value
pairs). It would make sense to have the data source name as part of
 the
general metadata in stream processing (think about it as the table
 name
   for
records in standard SQL).
   
- SQL/DSL
   
   * I think that this topic itself is worthy of another KIP
  discussion.
   I
would prefer to leave it out of scope in KIP-28.
   
- Client-side pluggable partition manager
   
   * Given the use cases we have seen with large-scale deployment of
Samza/Kafka in LinkedIn, I would argue that we should make it as the
first-class citizen in this KIP. The use cases include:
   
  * multi-cluster Kafka
   
  * host-affinity (i.e. local-state associated w/ certain
  partitions
   on
client)
   
- Multi-cluster scenario
   
   * Although I originally just brought it up as a use case that
  requires
client-side partition manager, reading Jay’s comments, I realized
 that
  I
have one fundamental issue w/ the current copycat + transformation

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-28 Thread Yi Pan
Hi, Aditya,

{quote}
- The KIP states that cmd line tools will be provided to deploy as a
separate service. Is the proposed scope limited to providing a library with
which makes it possible build stream-processing-as- a-service or provide
such a service within Kafka itself?
{quote}

There has already been a long discussion happened in Samza mailing list
which partly resulted in this KIP proposal. The basic conclusion was that
this KIP is to build stream processor library that could be used as library
or standalone process. The standalone process may be used as a deployment
method of stream process in a cluster environment, but that would be
outside the scope of this KIP.

-Yi

On Mon, Jul 27, 2015 at 10:46 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 +1 on comparison with existing solutions. On a high level, it seems nice to
 have a transform library inside Kafka.. a lot of the building blocks are
 already there to build a stream processing framework. However the details
 are tricky to get right I think this discussion will get a lot more
 interesting when we have something concrete to look at. I'm +1 for the
 general idea.
 How far away are we from having something a prototype patch to play with?

 Couple of observations:
 - Since the input source for each processor is always Kafka, you get basic
 client side partition management out of the box it use the high level
 consumer.
 - The KIP states that cmd line tools will be provided to deploy as a
 separate service. Is the proposed scope limited to providing a library with
 which makes it possible build stream-processing-as- a-service or provide
 such a service within Kafka itself?

 Aditya

 On Mon, Jul 27, 2015 at 8:20 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  Hi,
 
  Since we will be discussing KIP-28 in the call tomorrow, can you
  update the KIP with the feature-comparison with  existing solutions?
  I admit that I do not see a need for single-event-producer-consumer
  pair (AKA Flume Interceptor). I've seen tons of people implement such
  apps in the last year, and it seemed easy. Now, perhaps we were doing
  it all wrong... but I'd like to know how :)
 
  If we are talking about a bigger story (i.e. DSL, real
  stream-processing, etc), thats a different discussion. I've seen a
  bunch of misconceptions about SparkStreaming in this discussion, and I
  have some thoughts in that regard, but I'd rather not go into that if
  thats outside the scope of this KIP.
 
  Gwen
 
 
  On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com
 wrote:
   Hi Ewen,
  
   Replies inlined.
  
   On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava 
  e...@confluent.io
   wrote:
  
   Just some notes on the KIP doc itself:
  
   * It'd be useful to clarify at what point the plain consumer + custom
  code
   + producer breaks down. I think trivial filtering and aggregation on a
   single stream usually work fine with this model. Anything where you
 need
   more complex joins, windowing, etc. are where it breaks down. I think
  most
   interesting applications require that functionality, but it's helpful
 to
   make this really clear in the motivation -- right now, Kafka only
  provides
   the lowest level plumbing for stream processing applications, so most
   interesting apps require very heavyweight frameworks.
  
  
   I think for users to efficiently express complex logic like joins
   windowing, etc, a higher-level programming interface beyond the
 process()
   interface would definitely be better, but that does not necessarily
  require
   a heavyweight frameworks, which usually includes more than just the
   high-level functional programming model. I would argue that an
  alternative
   solution would better be provided for users who want some high-level
   programming interface but not a heavyweight stream processing framework
   that include the processor library plus another DSL layer on top of it.
  
  
  
   * I think the feature comparison of plain producer/consumer, stream
   processing frameworks, and this new library is a good start, but we
  might
   want something more thorough and structured, like a feature matrix.
  Right
   now it's hard to figure out exactly how they relate to each other.
  
  
   Cool, I can do that.
  
  
   * I'd personally push the library vs. framework story very strongly --
  the
   total buy-in and weak integration story of stream processing
 frameworks
  is
   a big downside and makes a library a really compelling (and currently
   unavailable, as far as I am aware) alternative.
  
  
   Are you suggesting there are still some content missing about the
   motivations of adding the proposed library in the wiki page?
  
  
   * Comment about in-memory storage of other frameworks is interesting
 --
  it
   is specific to the framework, but is supposed to also give performance
   benefits. The high-level functional processing interface would allow
 for
   combining multiple operations when 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-28 Thread Neha Narkhede
Adi,

How far away are we from having something a prototype patch to play with?


We are working to share a prototype next week. Though the code will evolve
to match the APIs and design as it shapes up, but it will be great if
people can take a look and provide feedback.

Couple of observations:
 - Since the input source for each processor is always Kafka, you get basic
 client side partition management out of the box it use the high level
 consumer.


That's right. The plan is to propose moving the partition assignment in the
consumer to the client-side (proposal coming up soon) and then use that in
Kafka streams and copycat.


 - The KIP states that cmd line tools will be provided to deploy as a
 separate service. Is the proposed scope limited to providing a library with
 which makes it possible build stream-processing-as- a-service or provide
 such a service within Kafka itself?


I think the KIP might've been a little misleading on this point. The scope
is to provide a library and have any integrations with other resource
management frameworks (Slider for YARN and Marathon for Mesos) live outside
Kafka. Having said that, in order to just get started with a simple stream
processing example, you still need basic scripts to get going. Those are
not anywhere similar in scope to what you'd expect in order to run this as
a service.

Thanks,
Neha

On Mon, Jul 27, 2015 at 10:57 PM, Neha Narkhede n...@confluent.io wrote:

 Gwen,

 We have a compilation of notes from comparison with other systems. They
 might be missing details that folks who worked on that system might be able
 to point out. We can share that and discuss further on the KIP call.

 We do hope to include a DSL since that is the most natural way of
 expressing stream processing operations on top of the processor client. The
 DSL layer should be equivalent to that provided by Spark streaming or Flink
 in terms of expressiveness though there will be differences in
 implementation. Our client is intended to be simpler, with minimum external
 dependencies since it integrates closely with Kafka. This is really what
 most application development is hoping to get - a lightweight library on
 top of Kafka that allows them to process streams of data.

 Thanks
 Neha

 On Mon, Jul 27, 2015 at 8:20 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

 Hi,

 Since we will be discussing KIP-28 in the call tomorrow, can you
 update the KIP with the feature-comparison with  existing solutions?
 I admit that I do not see a need for single-event-producer-consumer
 pair (AKA Flume Interceptor). I've seen tons of people implement such
 apps in the last year, and it seemed easy. Now, perhaps we were doing
 it all wrong... but I'd like to know how :)

 If we are talking about a bigger story (i.e. DSL, real
 stream-processing, etc), thats a different discussion. I've seen a
 bunch of misconceptions about SparkStreaming in this discussion, and I
 have some thoughts in that regard, but I'd rather not go into that if
 thats outside the scope of this KIP.

 Gwen


 On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com
 wrote:
  Hi Ewen,
 
  Replies inlined.
 
  On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava 
 e...@confluent.io

  wrote:
 
  Just some notes on the KIP doc itself:
 
  * It'd be useful to clarify at what point the plain consumer + custom
 code
  + producer breaks down. I think trivial filtering and aggregation on a
  single stream usually work fine with this model. Anything where you
 need
  more complex joins, windowing, etc. are where it breaks down. I think
 most
  interesting applications require that functionality, but it's helpful
 to
  make this really clear in the motivation -- right now, Kafka only
 provides
  the lowest level plumbing for stream processing applications, so most
  interesting apps require very heavyweight frameworks.
 
 
  I think for users to efficiently express complex logic like joins
  windowing, etc, a higher-level programming interface beyond the
 process()
  interface would definitely be better, but that does not necessarily
 require
  a heavyweight frameworks, which usually includes more than just the
  high-level functional programming model. I would argue that an
 alternative
  solution would better be provided for users who want some high-level
  programming interface but not a heavyweight stream processing framework
  that include the processor library plus another DSL layer on top of it.
 
 
 
  * I think the feature comparison of plain producer/consumer, stream
  processing frameworks, and this new library is a good start, but we
 might
  want something more thorough and structured, like a feature matrix.
 Right
  now it's hard to figure out exactly how they relate to each other.
 
 
  Cool, I can do that.
 
 
  * I'd personally push the library vs. framework story very strongly --
 the
  total buy-in and weak integration story of stream processing
 frameworks is
  a big downside and makes a library a 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-28 Thread Yi Pan
Hi, Neha,

{quote}
We do hope to include a DSL since that is the most natural way of
expressing stream processing operations on top of the processor client. The
DSL layer should be equivalent to that provided by Spark streaming or Flink
in terms of expressiveness though there will be differences in
implementation. Our client is intended to be simpler, with minimum external
dependencies since it integrates closely with Kafka. This is really what
most application development is hoping to get - a lightweight library on
top of Kafka that allows them to process streams of data.
{quote}

I believe that the above itself is worth another KIP. I felt that there
should be already a lot of system level APIs (i.e. process callbacks,
KV-stores, producer/consumer integration, partition manager, multi-clusters
use case, etc.) that needs to be handled in this KIP. Adding DSL/SQL
library here would bring in a whole set of problems/issues in very
different aspects and de-focus the scope of this KIP.

Just my one quick point.

On Mon, Jul 27, 2015 at 10:57 PM, Neha Narkhede n...@confluent.io wrote:

 Gwen,

 We have a compilation of notes from comparison with other systems. They
 might be missing details that folks who worked on that system might be able
 to point out. We can share that and discuss further on the KIP call.

 We do hope to include a DSL since that is the most natural way of
 expressing stream processing operations on top of the processor client. The
 DSL layer should be equivalent to that provided by Spark streaming or Flink
 in terms of expressiveness though there will be differences in
 implementation. Our client is intended to be simpler, with minimum external
 dependencies since it integrates closely with Kafka. This is really what
 most application development is hoping to get - a lightweight library on
 top of Kafka that allows them to process streams of data.

 Thanks
 Neha

 On Mon, Jul 27, 2015 at 8:20 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  Hi,
 
  Since we will be discussing KIP-28 in the call tomorrow, can you
  update the KIP with the feature-comparison with  existing solutions?
  I admit that I do not see a need for single-event-producer-consumer
  pair (AKA Flume Interceptor). I've seen tons of people implement such
  apps in the last year, and it seemed easy. Now, perhaps we were doing
  it all wrong... but I'd like to know how :)
 
  If we are talking about a bigger story (i.e. DSL, real
  stream-processing, etc), thats a different discussion. I've seen a
  bunch of misconceptions about SparkStreaming in this discussion, and I
  have some thoughts in that regard, but I'd rather not go into that if
  thats outside the scope of this KIP.
 
  Gwen
 
 
  On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com
 wrote:
   Hi Ewen,
  
   Replies inlined.
  
   On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava 
  e...@confluent.io
   wrote:
  
   Just some notes on the KIP doc itself:
  
   * It'd be useful to clarify at what point the plain consumer + custom
  code
   + producer breaks down. I think trivial filtering and aggregation on a
   single stream usually work fine with this model. Anything where you
 need
   more complex joins, windowing, etc. are where it breaks down. I think
  most
   interesting applications require that functionality, but it's helpful
 to
   make this really clear in the motivation -- right now, Kafka only
  provides
   the lowest level plumbing for stream processing applications, so most
   interesting apps require very heavyweight frameworks.
  
  
   I think for users to efficiently express complex logic like joins
   windowing, etc, a higher-level programming interface beyond the
 process()
   interface would definitely be better, but that does not necessarily
  require
   a heavyweight frameworks, which usually includes more than just the
   high-level functional programming model. I would argue that an
  alternative
   solution would better be provided for users who want some high-level
   programming interface but not a heavyweight stream processing framework
   that include the processor library plus another DSL layer on top of it.
  
  
  
   * I think the feature comparison of plain producer/consumer, stream
   processing frameworks, and this new library is a good start, but we
  might
   want something more thorough and structured, like a feature matrix.
  Right
   now it's hard to figure out exactly how they relate to each other.
  
  
   Cool, I can do that.
  
  
   * I'd personally push the library vs. framework story very strongly --
  the
   total buy-in and weak integration story of stream processing
 frameworks
  is
   a big downside and makes a library a really compelling (and currently
   unavailable, as far as I am aware) alternative.
  
  
   Are you suggesting there are still some content missing about the
   motivations of adding the proposed library in the wiki page?
  
  
   * Comment about in-memory storage of other 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-28 Thread Guozhang Wang
Hi Adi,

Just to clarify, the cmdline tool would be used, as stated in the wiki
page, to run the client library as a process, which is still far away
from a service. It is just like what we have for kafka-console-producer,
kafka-console-consumer, kafka-mirror-maker, etc today.

Guozhang

On Mon, Jul 27, 2015 at 10:46 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 +1 on comparison with existing solutions. On a high level, it seems nice to
 have a transform library inside Kafka.. a lot of the building blocks are
 already there to build a stream processing framework. However the details
 are tricky to get right I think this discussion will get a lot more
 interesting when we have something concrete to look at. I'm +1 for the
 general idea.
 How far away are we from having something a prototype patch to play with?

 Couple of observations:
 - Since the input source for each processor is always Kafka, you get basic
 client side partition management out of the box it use the high level
 consumer.
 - The KIP states that cmd line tools will be provided to deploy as a
 separate service. Is the proposed scope limited to providing a library with
 which makes it possible build stream-processing-as- a-service or provide
 such a service within Kafka itself?

 Aditya

 On Mon, Jul 27, 2015 at 8:20 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  Hi,
 
  Since we will be discussing KIP-28 in the call tomorrow, can you
  update the KIP with the feature-comparison with  existing solutions?
  I admit that I do not see a need for single-event-producer-consumer
  pair (AKA Flume Interceptor). I've seen tons of people implement such
  apps in the last year, and it seemed easy. Now, perhaps we were doing
  it all wrong... but I'd like to know how :)
 
  If we are talking about a bigger story (i.e. DSL, real
  stream-processing, etc), thats a different discussion. I've seen a
  bunch of misconceptions about SparkStreaming in this discussion, and I
  have some thoughts in that regard, but I'd rather not go into that if
  thats outside the scope of this KIP.
 
  Gwen
 
 
  On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com
 wrote:
   Hi Ewen,
  
   Replies inlined.
  
   On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava 
  e...@confluent.io
   wrote:
  
   Just some notes on the KIP doc itself:
  
   * It'd be useful to clarify at what point the plain consumer + custom
  code
   + producer breaks down. I think trivial filtering and aggregation on a
   single stream usually work fine with this model. Anything where you
 need
   more complex joins, windowing, etc. are where it breaks down. I think
  most
   interesting applications require that functionality, but it's helpful
 to
   make this really clear in the motivation -- right now, Kafka only
  provides
   the lowest level plumbing for stream processing applications, so most
   interesting apps require very heavyweight frameworks.
  
  
   I think for users to efficiently express complex logic like joins
   windowing, etc, a higher-level programming interface beyond the
 process()
   interface would definitely be better, but that does not necessarily
  require
   a heavyweight frameworks, which usually includes more than just the
   high-level functional programming model. I would argue that an
  alternative
   solution would better be provided for users who want some high-level
   programming interface but not a heavyweight stream processing framework
   that include the processor library plus another DSL layer on top of it.
  
  
  
   * I think the feature comparison of plain producer/consumer, stream
   processing frameworks, and this new library is a good start, but we
  might
   want something more thorough and structured, like a feature matrix.
  Right
   now it's hard to figure out exactly how they relate to each other.
  
  
   Cool, I can do that.
  
  
   * I'd personally push the library vs. framework story very strongly --
  the
   total buy-in and weak integration story of stream processing
 frameworks
  is
   a big downside and makes a library a really compelling (and currently
   unavailable, as far as I am aware) alternative.
  
  
   Are you suggesting there are still some content missing about the
   motivations of adding the proposed library in the wiki page?
  
  
   * Comment about in-memory storage of other frameworks is interesting
 --
  it
   is specific to the framework, but is supposed to also give performance
   benefits. The high-level functional processing interface would allow
 for
   combining multiple operations when there's no shuffle, but when there
  is a
   shuffle, we'll always be writing to Kafka, right? Spark (and
 presumably
   spark streaming) is supposed to get a big win by handling shuffles
 such
   that the data just stays in cache and never actually hits disk, or at
  least
   hits disk in the background. Will we take a hit because we always
 write
  to
   Kafka?
  
  
   I agree with Neha's 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-28 Thread Yi Pan
Hi, Jay,

{quote}
1. Yeah we are going to try to generalize the partition management stuff.
We'll get a wiki/JIRA up for that. I think that gives what you want in
terms of moving partitioning to the client side.
{quote}
Great! I am looking forward to that.

{quote}
I think the key observation is that the whole reason
LinkedIn split data over clusters to begin with was because of the lack of
quotas, which are in any case getting implemented.
{quote}
I am not sure that I followed this point. Is your point that with quota, it
is possible to host all data in a single cluster?

-Yi

On Mon, Jul 27, 2015 at 8:53 AM, Jay Kreps j...@confluent.io wrote:

 Hey Yi,

 Great points. I think for some of this the most useful thing would be to
 get a wip prototype out that we could discuss concretely. I think Yasuhiro
 and Guozhang took that prototype I had done, and had some improvements.
 Give us a bit to get that into understandable shape so we can discuss.

 To address a few of your other points:
 1. Yeah we are going to try to generalize the partition management stuff.
 We'll get a wiki/JIRA up for that. I think that gives what you want in
 terms of moving partitioning to the client side.
 2. I think consuming from a different cluster you produce to will be easy.
 More than that is more complex, though I agree the pluggable partitioning
 makes it theoretically possible. Let's try to get something that works for
 the first case, it sounds like that solves the use case you describe of
 wanting to directly transform from a given cluster but produce back to a
 different cluster. I think the key observation is that the whole reason
 LinkedIn split data over clusters to begin with was because of the lack of
 quotas, which are in any case getting implemented.

 -Jay

 On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan nickpa...@gmail.com wrote:

  Hi, Jay and all,
 
  Thanks for all your quick responses. I tried to summarize my thoughts
 here:
 
  - ConsumerRecord as stream processor API:
 
 * This KafkaProcessor API is targeted to receive the message from
 Kafka.
  So, to Yasuhiro's join/transformation example, any join/transformation
  results that are materialized in Kafka should have ConsumerRecord format
  (i.e. w/ topic and offsets). Any non-materialized join/transformation
  results should not be processed by this KafkaProcessor API. One example
 is
  the in-memory operators API in Samza, which is designed to handle the
  non-materialzied join/transformation results. And yes, in this case, a
 more
  abstract data model is needed.
 
 * Just to support Jay's point of a general
  ConsumerRecord/ProducerRecord, a general stream processing on more than
 one
  data sources would need at least the following info: data source
  description (i.e. which topic/table), and actual data (i.e. key-value
  pairs). It would make sense to have the data source name as part of the
  general metadata in stream processing (think about it as the table name
 for
  records in standard SQL).
 
  - SQL/DSL
 
 * I think that this topic itself is worthy of another KIP discussion.
 I
  would prefer to leave it out of scope in KIP-28.
 
  - Client-side pluggable partition manager
 
 * Given the use cases we have seen with large-scale deployment of
  Samza/Kafka in LinkedIn, I would argue that we should make it as the
  first-class citizen in this KIP. The use cases include:
 
* multi-cluster Kafka
 
* host-affinity (i.e. local-state associated w/ certain partitions
 on
  client)
 
  - Multi-cluster scenario
 
 * Although I originally just brought it up as a use case that requires
  client-side partition manager, reading Jay’s comments, I realized that I
  have one fundamental issue w/ the current copycat + transformation model.
  If I interpret Jay’s comment correctly, the proposed
 copycat+transformation
  plays out in the following way: i) copycat takes all data from sources
 (no
  matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii)
  transformation is only restricted to take data sources in *this single
  Kafka cluster* to perform aggregate/join etc. This is different from my
  original understanding of the copycat. The main issue I have with this
  model is: huge data-copy between Kafka clusters. In LinkedIn, we used to
  follow this model that uses MirrorMaker to map topics from tracking
  clusters to Samza-specific Kafka cluster and only do stream processing in
  the Samza-specific Kafka cluster. We moved away from this model and
 started
  allowing users to directly consume from tracking Kafka clusters due to
 the
  overhead of copying huge amount of traffic between Kafka clusters. I
 agree
  that the initial design of KIP-28 would probably need a smaller scope of
  problem to solve, hence, limiting to solving partition management in a
  single cluster. However, I would really hope the design won’t prevent the
  use case of processing data directly from multiple clusters. In my
 opinion,
  making 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-28 Thread Gwen Shapira
Thanks Guazhang! Much clearer now, at least for me.

Few comments / questions:

1. Perhaps punctuate(int numRecords) will be a nice API addition, some
use-cases have record-count based windows, rather than time-based..
2. The diagram for Flexible partition distribution shows two joins.
Is the idea to implement two Processors and string them together?
3.  Is the local state persistent? Can you talk a bit about how local
state works with high availability?

Gwen

On Tue, Jul 28, 2015 at 12:57 AM, Guozhang Wang wangg...@gmail.com wrote:
 I have updated the wiki page incorporating people's comments, please feel
 free to take another look before today's meeting.

 On Mon, Jul 27, 2015 at 11:19 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Jay,

 {quote}
 1. Yeah we are going to try to generalize the partition management stuff.
 We'll get a wiki/JIRA up for that. I think that gives what you want in
 terms of moving partitioning to the client side.
 {quote}
 Great! I am looking forward to that.

 {quote}
 I think the key observation is that the whole reason
 LinkedIn split data over clusters to begin with was because of the lack of
 quotas, which are in any case getting implemented.
 {quote}
 I am not sure that I followed this point. Is your point that with quota, it
 is possible to host all data in a single cluster?

 -Yi

 On Mon, Jul 27, 2015 at 8:53 AM, Jay Kreps j...@confluent.io wrote:

  Hey Yi,
 
  Great points. I think for some of this the most useful thing would be to
  get a wip prototype out that we could discuss concretely. I think
 Yasuhiro
  and Guozhang took that prototype I had done, and had some improvements.
  Give us a bit to get that into understandable shape so we can discuss.
 
  To address a few of your other points:
  1. Yeah we are going to try to generalize the partition management stuff.
  We'll get a wiki/JIRA up for that. I think that gives what you want in
  terms of moving partitioning to the client side.
  2. I think consuming from a different cluster you produce to will be
 easy.
  More than that is more complex, though I agree the pluggable partitioning
  makes it theoretically possible. Let's try to get something that works
 for
  the first case, it sounds like that solves the use case you describe of
  wanting to directly transform from a given cluster but produce back to a
  different cluster. I think the key observation is that the whole reason
  LinkedIn split data over clusters to begin with was because of the lack
 of
  quotas, which are in any case getting implemented.
 
  -Jay
 
  On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan nickpa...@gmail.com wrote:
 
   Hi, Jay and all,
  
   Thanks for all your quick responses. I tried to summarize my thoughts
  here:
  
   - ConsumerRecord as stream processor API:
  
  * This KafkaProcessor API is targeted to receive the message from
  Kafka.
   So, to Yasuhiro's join/transformation example, any join/transformation
   results that are materialized in Kafka should have ConsumerRecord
 format
   (i.e. w/ topic and offsets). Any non-materialized join/transformation
   results should not be processed by this KafkaProcessor API. One example
  is
   the in-memory operators API in Samza, which is designed to handle the
   non-materialzied join/transformation results. And yes, in this case, a
  more
   abstract data model is needed.
  
  * Just to support Jay's point of a general
   ConsumerRecord/ProducerRecord, a general stream processing on more than
  one
   data sources would need at least the following info: data source
   description (i.e. which topic/table), and actual data (i.e. key-value
   pairs). It would make sense to have the data source name as part of the
   general metadata in stream processing (think about it as the table name
  for
   records in standard SQL).
  
   - SQL/DSL
  
  * I think that this topic itself is worthy of another KIP
 discussion.
  I
   would prefer to leave it out of scope in KIP-28.
  
   - Client-side pluggable partition manager
  
  * Given the use cases we have seen with large-scale deployment of
   Samza/Kafka in LinkedIn, I would argue that we should make it as the
   first-class citizen in this KIP. The use cases include:
  
 * multi-cluster Kafka
  
 * host-affinity (i.e. local-state associated w/ certain
 partitions
  on
   client)
  
   - Multi-cluster scenario
  
  * Although I originally just brought it up as a use case that
 requires
   client-side partition manager, reading Jay’s comments, I realized that
 I
   have one fundamental issue w/ the current copycat + transformation
 model.
   If I interpret Jay’s comment correctly, the proposed
  copycat+transformation
   plays out in the following way: i) copycat takes all data from sources
  (no
   matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii)
   transformation is only restricted to take data sources in *this single
   Kafka cluster* to perform aggregate/join etc. This is different 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-27 Thread Aditya Auradkar
+1 on comparison with existing solutions. On a high level, it seems nice to
have a transform library inside Kafka.. a lot of the building blocks are
already there to build a stream processing framework. However the details
are tricky to get right I think this discussion will get a lot more
interesting when we have something concrete to look at. I'm +1 for the
general idea.
How far away are we from having something a prototype patch to play with?

Couple of observations:
- Since the input source for each processor is always Kafka, you get basic
client side partition management out of the box it use the high level
consumer.
- The KIP states that cmd line tools will be provided to deploy as a
separate service. Is the proposed scope limited to providing a library with
which makes it possible build stream-processing-as- a-service or provide
such a service within Kafka itself?

Aditya

On Mon, Jul 27, 2015 at 8:20 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,

 Since we will be discussing KIP-28 in the call tomorrow, can you
 update the KIP with the feature-comparison with  existing solutions?
 I admit that I do not see a need for single-event-producer-consumer
 pair (AKA Flume Interceptor). I've seen tons of people implement such
 apps in the last year, and it seemed easy. Now, perhaps we were doing
 it all wrong... but I'd like to know how :)

 If we are talking about a bigger story (i.e. DSL, real
 stream-processing, etc), thats a different discussion. I've seen a
 bunch of misconceptions about SparkStreaming in this discussion, and I
 have some thoughts in that regard, but I'd rather not go into that if
 thats outside the scope of this KIP.

 Gwen


 On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com wrote:
  Hi Ewen,
 
  Replies inlined.
 
  On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava 
 e...@confluent.io
  wrote:
 
  Just some notes on the KIP doc itself:
 
  * It'd be useful to clarify at what point the plain consumer + custom
 code
  + producer breaks down. I think trivial filtering and aggregation on a
  single stream usually work fine with this model. Anything where you need
  more complex joins, windowing, etc. are where it breaks down. I think
 most
  interesting applications require that functionality, but it's helpful to
  make this really clear in the motivation -- right now, Kafka only
 provides
  the lowest level plumbing for stream processing applications, so most
  interesting apps require very heavyweight frameworks.
 
 
  I think for users to efficiently express complex logic like joins
  windowing, etc, a higher-level programming interface beyond the process()
  interface would definitely be better, but that does not necessarily
 require
  a heavyweight frameworks, which usually includes more than just the
  high-level functional programming model. I would argue that an
 alternative
  solution would better be provided for users who want some high-level
  programming interface but not a heavyweight stream processing framework
  that include the processor library plus another DSL layer on top of it.
 
 
 
  * I think the feature comparison of plain producer/consumer, stream
  processing frameworks, and this new library is a good start, but we
 might
  want something more thorough and structured, like a feature matrix.
 Right
  now it's hard to figure out exactly how they relate to each other.
 
 
  Cool, I can do that.
 
 
  * I'd personally push the library vs. framework story very strongly --
 the
  total buy-in and weak integration story of stream processing frameworks
 is
  a big downside and makes a library a really compelling (and currently
  unavailable, as far as I am aware) alternative.
 
 
  Are you suggesting there are still some content missing about the
  motivations of adding the proposed library in the wiki page?
 
 
  * Comment about in-memory storage of other frameworks is interesting --
 it
  is specific to the framework, but is supposed to also give performance
  benefits. The high-level functional processing interface would allow for
  combining multiple operations when there's no shuffle, but when there
 is a
  shuffle, we'll always be writing to Kafka, right? Spark (and presumably
  spark streaming) is supposed to get a big win by handling shuffles such
  that the data just stays in cache and never actually hits disk, or at
 least
  hits disk in the background. Will we take a hit because we always write
 to
  Kafka?
 
 
  I agree with Neha's comments here. One more point I want to make is
  materializing to Kafka is not necessarily much worse than keeping data in
  memory if the downstream consumption is caught up such that most of the
  reads will be hitting file cache. I remember Samza has illustrated that
  under such scenarios its throughput is actually quite comparable to Spark
  Streaming / Storm.
 
 
  * I really struggled with the structure of the KIP template with Copycat
  because the flow doesn't work well for proposals like 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-27 Thread Neha Narkhede
Gwen,

We have a compilation of notes from comparison with other systems. They
might be missing details that folks who worked on that system might be able
to point out. We can share that and discuss further on the KIP call.

We do hope to include a DSL since that is the most natural way of
expressing stream processing operations on top of the processor client. The
DSL layer should be equivalent to that provided by Spark streaming or Flink
in terms of expressiveness though there will be differences in
implementation. Our client is intended to be simpler, with minimum external
dependencies since it integrates closely with Kafka. This is really what
most application development is hoping to get - a lightweight library on
top of Kafka that allows them to process streams of data.

Thanks
Neha

On Mon, Jul 27, 2015 at 8:20 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,

 Since we will be discussing KIP-28 in the call tomorrow, can you
 update the KIP with the feature-comparison with  existing solutions?
 I admit that I do not see a need for single-event-producer-consumer
 pair (AKA Flume Interceptor). I've seen tons of people implement such
 apps in the last year, and it seemed easy. Now, perhaps we were doing
 it all wrong... but I'd like to know how :)

 If we are talking about a bigger story (i.e. DSL, real
 stream-processing, etc), thats a different discussion. I've seen a
 bunch of misconceptions about SparkStreaming in this discussion, and I
 have some thoughts in that regard, but I'd rather not go into that if
 thats outside the scope of this KIP.

 Gwen


 On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com wrote:
  Hi Ewen,
 
  Replies inlined.
 
  On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava 
 e...@confluent.io
  wrote:
 
  Just some notes on the KIP doc itself:
 
  * It'd be useful to clarify at what point the plain consumer + custom
 code
  + producer breaks down. I think trivial filtering and aggregation on a
  single stream usually work fine with this model. Anything where you need
  more complex joins, windowing, etc. are where it breaks down. I think
 most
  interesting applications require that functionality, but it's helpful to
  make this really clear in the motivation -- right now, Kafka only
 provides
  the lowest level plumbing for stream processing applications, so most
  interesting apps require very heavyweight frameworks.
 
 
  I think for users to efficiently express complex logic like joins
  windowing, etc, a higher-level programming interface beyond the process()
  interface would definitely be better, but that does not necessarily
 require
  a heavyweight frameworks, which usually includes more than just the
  high-level functional programming model. I would argue that an
 alternative
  solution would better be provided for users who want some high-level
  programming interface but not a heavyweight stream processing framework
  that include the processor library plus another DSL layer on top of it.
 
 
 
  * I think the feature comparison of plain producer/consumer, stream
  processing frameworks, and this new library is a good start, but we
 might
  want something more thorough and structured, like a feature matrix.
 Right
  now it's hard to figure out exactly how they relate to each other.
 
 
  Cool, I can do that.
 
 
  * I'd personally push the library vs. framework story very strongly --
 the
  total buy-in and weak integration story of stream processing frameworks
 is
  a big downside and makes a library a really compelling (and currently
  unavailable, as far as I am aware) alternative.
 
 
  Are you suggesting there are still some content missing about the
  motivations of adding the proposed library in the wiki page?
 
 
  * Comment about in-memory storage of other frameworks is interesting --
 it
  is specific to the framework, but is supposed to also give performance
  benefits. The high-level functional processing interface would allow for
  combining multiple operations when there's no shuffle, but when there
 is a
  shuffle, we'll always be writing to Kafka, right? Spark (and presumably
  spark streaming) is supposed to get a big win by handling shuffles such
  that the data just stays in cache and never actually hits disk, or at
 least
  hits disk in the background. Will we take a hit because we always write
 to
  Kafka?
 
 
  I agree with Neha's comments here. One more point I want to make is
  materializing to Kafka is not necessarily much worse than keeping data in
  memory if the downstream consumption is caught up such that most of the
  reads will be hitting file cache. I remember Samza has illustrated that
  under such scenarios its throughput is actually quite comparable to Spark
  Streaming / Storm.
 
 
  * I really struggled with the structure of the KIP template with Copycat
  because the flow doesn't work well for proposals like this. They aren't
 as
  concrete changes as the KIP template was designed for. I'd completely
  

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-27 Thread Jay Kreps
Hey Yi,

Great points. I think for some of this the most useful thing would be to
get a wip prototype out that we could discuss concretely. I think Yasuhiro
and Guozhang took that prototype I had done, and had some improvements.
Give us a bit to get that into understandable shape so we can discuss.

To address a few of your other points:
1. Yeah we are going to try to generalize the partition management stuff.
We'll get a wiki/JIRA up for that. I think that gives what you want in
terms of moving partitioning to the client side.
2. I think consuming from a different cluster you produce to will be easy.
More than that is more complex, though I agree the pluggable partitioning
makes it theoretically possible. Let's try to get something that works for
the first case, it sounds like that solves the use case you describe of
wanting to directly transform from a given cluster but produce back to a
different cluster. I think the key observation is that the whole reason
LinkedIn split data over clusters to begin with was because of the lack of
quotas, which are in any case getting implemented.

-Jay

On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Jay and all,

 Thanks for all your quick responses. I tried to summarize my thoughts here:

 - ConsumerRecord as stream processor API:

* This KafkaProcessor API is targeted to receive the message from Kafka.
 So, to Yasuhiro's join/transformation example, any join/transformation
 results that are materialized in Kafka should have ConsumerRecord format
 (i.e. w/ topic and offsets). Any non-materialized join/transformation
 results should not be processed by this KafkaProcessor API. One example is
 the in-memory operators API in Samza, which is designed to handle the
 non-materialzied join/transformation results. And yes, in this case, a more
 abstract data model is needed.

* Just to support Jay's point of a general
 ConsumerRecord/ProducerRecord, a general stream processing on more than one
 data sources would need at least the following info: data source
 description (i.e. which topic/table), and actual data (i.e. key-value
 pairs). It would make sense to have the data source name as part of the
 general metadata in stream processing (think about it as the table name for
 records in standard SQL).

 - SQL/DSL

* I think that this topic itself is worthy of another KIP discussion. I
 would prefer to leave it out of scope in KIP-28.

 - Client-side pluggable partition manager

* Given the use cases we have seen with large-scale deployment of
 Samza/Kafka in LinkedIn, I would argue that we should make it as the
 first-class citizen in this KIP. The use cases include:

   * multi-cluster Kafka

   * host-affinity (i.e. local-state associated w/ certain partitions on
 client)

 - Multi-cluster scenario

* Although I originally just brought it up as a use case that requires
 client-side partition manager, reading Jay’s comments, I realized that I
 have one fundamental issue w/ the current copycat + transformation model.
 If I interpret Jay’s comment correctly, the proposed copycat+transformation
 plays out in the following way: i) copycat takes all data from sources (no
 matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii)
 transformation is only restricted to take data sources in *this single
 Kafka cluster* to perform aggregate/join etc. This is different from my
 original understanding of the copycat. The main issue I have with this
 model is: huge data-copy between Kafka clusters. In LinkedIn, we used to
 follow this model that uses MirrorMaker to map topics from tracking
 clusters to Samza-specific Kafka cluster and only do stream processing in
 the Samza-specific Kafka cluster. We moved away from this model and started
 allowing users to directly consume from tracking Kafka clusters due to the
 overhead of copying huge amount of traffic between Kafka clusters. I agree
 that the initial design of KIP-28 would probably need a smaller scope of
 problem to solve, hence, limiting to solving partition management in a
 single cluster. However, I would really hope the design won’t prevent the
 use case of processing data directly from multiple clusters. In my opinion,
 making the partition manager as a client-side pluggable logic would allow
 us to achieve these goals.

 Thanks a lot in advance!

 -Yi

 On Fri, Jul 24, 2015 at 11:13 AM, Jay Kreps j...@confluent.io wrote:

  Hey Yi,
 
  For your other two points:
 
  - This definitely doesn't cover any kind of SQL or anything like this.
 
  - The prototype we started with just had process() as a method but
 Yasuhiro
  had some ideas of adding additional filter/aggregate convenience methods.
  We should discuss how this would fit with the operator work you were
 doing
  in Samza. Probably the best way is just get the code out there in current
  state and start talking about it?
 
  - Your point about multiple clusters. We actually have a proposed
 extension
  

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-27 Thread Gwen Shapira
Hi,

Since we will be discussing KIP-28 in the call tomorrow, can you
update the KIP with the feature-comparison with  existing solutions?
I admit that I do not see a need for single-event-producer-consumer
pair (AKA Flume Interceptor). I've seen tons of people implement such
apps in the last year, and it seemed easy. Now, perhaps we were doing
it all wrong... but I'd like to know how :)

If we are talking about a bigger story (i.e. DSL, real
stream-processing, etc), thats a different discussion. I've seen a
bunch of misconceptions about SparkStreaming in this discussion, and I
have some thoughts in that regard, but I'd rather not go into that if
thats outside the scope of this KIP.

Gwen


On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com wrote:
 Hi Ewen,

 Replies inlined.

 On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:

 Just some notes on the KIP doc itself:

 * It'd be useful to clarify at what point the plain consumer + custom code
 + producer breaks down. I think trivial filtering and aggregation on a
 single stream usually work fine with this model. Anything where you need
 more complex joins, windowing, etc. are where it breaks down. I think most
 interesting applications require that functionality, but it's helpful to
 make this really clear in the motivation -- right now, Kafka only provides
 the lowest level plumbing for stream processing applications, so most
 interesting apps require very heavyweight frameworks.


 I think for users to efficiently express complex logic like joins
 windowing, etc, a higher-level programming interface beyond the process()
 interface would definitely be better, but that does not necessarily require
 a heavyweight frameworks, which usually includes more than just the
 high-level functional programming model. I would argue that an alternative
 solution would better be provided for users who want some high-level
 programming interface but not a heavyweight stream processing framework
 that include the processor library plus another DSL layer on top of it.



 * I think the feature comparison of plain producer/consumer, stream
 processing frameworks, and this new library is a good start, but we might
 want something more thorough and structured, like a feature matrix. Right
 now it's hard to figure out exactly how they relate to each other.


 Cool, I can do that.


 * I'd personally push the library vs. framework story very strongly -- the
 total buy-in and weak integration story of stream processing frameworks is
 a big downside and makes a library a really compelling (and currently
 unavailable, as far as I am aware) alternative.


 Are you suggesting there are still some content missing about the
 motivations of adding the proposed library in the wiki page?


 * Comment about in-memory storage of other frameworks is interesting -- it
 is specific to the framework, but is supposed to also give performance
 benefits. The high-level functional processing interface would allow for
 combining multiple operations when there's no shuffle, but when there is a
 shuffle, we'll always be writing to Kafka, right? Spark (and presumably
 spark streaming) is supposed to get a big win by handling shuffles such
 that the data just stays in cache and never actually hits disk, or at least
 hits disk in the background. Will we take a hit because we always write to
 Kafka?


 I agree with Neha's comments here. One more point I want to make is
 materializing to Kafka is not necessarily much worse than keeping data in
 memory if the downstream consumption is caught up such that most of the
 reads will be hitting file cache. I remember Samza has illustrated that
 under such scenarios its throughput is actually quite comparable to Spark
 Streaming / Storm.


 * I really struggled with the structure of the KIP template with Copycat
 because the flow doesn't work well for proposals like this. They aren't as
 concrete changes as the KIP template was designed for. I'd completely
 ignore that template in favor of optimizing for clarity if I were you.

 -Ewen

 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi all,
 
  I just posted KIP-28: Add a transform client for data processing
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
  
  .
 
  The wiki page does not yet have the full design / implementation details,
  and this email is to kick-off the conversation on whether we should add
  this new client with the described motivations, and if yes what features
 /
  functionalities should be included.
 
  Looking forward to your feedback!
 
  -- Guozhang
 



 --
 Thanks,
 Ewen




 --
 -- Guozhang


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-27 Thread Yi Pan
Hi, Jay and all,

Thanks for all your quick responses. I tried to summarize my thoughts here:

- ConsumerRecord as stream processor API:

   * This KafkaProcessor API is targeted to receive the message from Kafka.
So, to Yasuhiro's join/transformation example, any join/transformation
results that are materialized in Kafka should have ConsumerRecord format
(i.e. w/ topic and offsets). Any non-materialized join/transformation
results should not be processed by this KafkaProcessor API. One example is
the in-memory operators API in Samza, which is designed to handle the
non-materialzied join/transformation results. And yes, in this case, a more
abstract data model is needed.

   * Just to support Jay's point of a general
ConsumerRecord/ProducerRecord, a general stream processing on more than one
data sources would need at least the following info: data source
description (i.e. which topic/table), and actual data (i.e. key-value
pairs). It would make sense to have the data source name as part of the
general metadata in stream processing (think about it as the table name for
records in standard SQL).

- SQL/DSL

   * I think that this topic itself is worthy of another KIP discussion. I
would prefer to leave it out of scope in KIP-28.

- Client-side pluggable partition manager

   * Given the use cases we have seen with large-scale deployment of
Samza/Kafka in LinkedIn, I would argue that we should make it as the
first-class citizen in this KIP. The use cases include:

  * multi-cluster Kafka

  * host-affinity (i.e. local-state associated w/ certain partitions on
client)

- Multi-cluster scenario

   * Although I originally just brought it up as a use case that requires
client-side partition manager, reading Jay’s comments, I realized that I
have one fundamental issue w/ the current copycat + transformation model.
If I interpret Jay’s comment correctly, the proposed copycat+transformation
plays out in the following way: i) copycat takes all data from sources (no
matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii)
transformation is only restricted to take data sources in *this single
Kafka cluster* to perform aggregate/join etc. This is different from my
original understanding of the copycat. The main issue I have with this
model is: huge data-copy between Kafka clusters. In LinkedIn, we used to
follow this model that uses MirrorMaker to map topics from tracking
clusters to Samza-specific Kafka cluster and only do stream processing in
the Samza-specific Kafka cluster. We moved away from this model and started
allowing users to directly consume from tracking Kafka clusters due to the
overhead of copying huge amount of traffic between Kafka clusters. I agree
that the initial design of KIP-28 would probably need a smaller scope of
problem to solve, hence, limiting to solving partition management in a
single cluster. However, I would really hope the design won’t prevent the
use case of processing data directly from multiple clusters. In my opinion,
making the partition manager as a client-side pluggable logic would allow
us to achieve these goals.

Thanks a lot in advance!

-Yi

On Fri, Jul 24, 2015 at 11:13 AM, Jay Kreps j...@confluent.io wrote:

 Hey Yi,

 For your other two points:

 - This definitely doesn't cover any kind of SQL or anything like this.

 - The prototype we started with just had process() as a method but Yasuhiro
 had some ideas of adding additional filter/aggregate convenience methods.
 We should discuss how this would fit with the operator work you were doing
 in Samza. Probably the best way is just get the code out there in current
 state and start talking about it?

 - Your point about multiple clusters. We actually have a proposed extension
 for the Kafka group management protocol that would allow it to cover
 multiple clusters but actually I think that use case is not the focus. I
 think in scope would be consuming from one cluster and producing to
 another.

 One of the assumptions we are making is that we will split into two
 categories:
 a. Ingress/egress which is handled by copycat
 b. Transformation which would be handled by this api

 I think there are a number of motivations for this
 - It is really hard to provide hard guarantees if you allow non-trivial
 aggregation coupled with the ingress/egress. So if you want to be able to
 do something that provides a kind of end-to-end exactly once guarantee
 (that's not really the right term but what people use) I think it will be
 really hard to do this across multiple systems (hello two-phase commit)
 - The APIs for ingest/egress end up needing to be really different for a
 first-class ingestion framework

 So the case where you have data coming from many systems including many
 Kafka clusters is just about how easy/hard it is to use copycat with the
 transformer api in the same program. I think this is something we should
 work out as part of the prototyping.

 -Jay

 On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Jiangjie Qin
Hey Guozhang,

I just took a quick look at the KIP, is it very similar to mirror maker
with message handler?

Thanks,

Jiangjie (Becket) Qin

On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 Just some notes on the KIP doc itself:

 * It'd be useful to clarify at what point the plain consumer + custom code
 + producer breaks down. I think trivial filtering and aggregation on a
 single stream usually work fine with this model. Anything where you need
 more complex joins, windowing, etc. are where it breaks down. I think most
 interesting applications require that functionality, but it's helpful to
 make this really clear in the motivation -- right now, Kafka only provides
 the lowest level plumbing for stream processing applications, so most
 interesting apps require very heavyweight frameworks.
 * I think the feature comparison of plain producer/consumer, stream
 processing frameworks, and this new library is a good start, but we might
 want something more thorough and structured, like a feature matrix. Right
 now it's hard to figure out exactly how they relate to each other.
 * I'd personally push the library vs. framework story very strongly -- the
 total buy-in and weak integration story of stream processing frameworks is
 a big downside and makes a library a really compelling (and currently
 unavailable, as far as I am aware) alternative.
 * Comment about in-memory storage of other frameworks is interesting -- it
 is specific to the framework, but is supposed to also give performance
 benefits. The high-level functional processing interface would allow for
 combining multiple operations when there's no shuffle, but when there is a
 shuffle, we'll always be writing to Kafka, right? Spark (and presumably
 spark streaming) is supposed to get a big win by handling shuffles such
 that the data just stays in cache and never actually hits disk, or at least
 hits disk in the background. Will we take a hit because we always write to
 Kafka?
 * I really struggled with the structure of the KIP template with Copycat
 because the flow doesn't work well for proposals like this. They aren't as
 concrete changes as the KIP template was designed for. I'd completely
 ignore that template in favor of optimizing for clarity if I were you.

 -Ewen

 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi all,
 
  I just posted KIP-28: Add a transform client for data processing
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
  
  .
 
  The wiki page does not yet have the full design / implementation details,
  and this email is to kick-off the conversation on whether we should add
  this new client with the described motivations, and if yes what features
 /
  functionalities should be included.
 
  Looking forward to your feedback!
 
  -- Guozhang
 



 --
 Thanks,
 Ewen



Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Neha Narkhede
Ewen:

* I think trivial filtering and aggregation on a single stream usually work
 fine with this model.


The way I see this, the process() API is an abstraction for
message-at-a-time computations. In the future, you could imagine providing
a simple DSL layer on top of the process() API that provides a set of APIs
for stream processing operations on sets of messages like joins, windows
and various aggregations.

* Spark (and presumably
 spark streaming) is supposed to get a big win by handling shuffles such
 that the data just stays in cache and never actually hits disk, or at least
 hits disk in the background. Will we take a hit because we always write to
 Kafka?


The goal isn't so much about forcing materialization of intermediate
results into Kafka but designing the API to integrate with Kafka to allow
such materialization, wherever that might be required. The downside with
other stream processing frameworks is that they have weak integration with
Kafka where interaction with Kafka is only at the endpoints of processing
(first input, final output). Any intermediate operations that might benefit
from persisting intermediate results into Kafka are forced to be broken up
into 2 separate topologies/plans/stages of processing that lead to more
jobs. The implication is that now the set of stream processing operations
that should really have lived in one job per application is now split up
across several piecemeal jobs that need to be monitored, managed and
operated separately. The APIs should still allows in-memory storage of
intermediate results where they make sense.

Jiangjie,

I just took a quick look at the KIP, is it very similar to mirror maker
 with message handler?


Not really. I wouldn't say it is similar, but mirror maker is a special
instance of using copycat with Kafka source, sink + optionally the
process() API. I can imagine replacing the MirrorMaker, in the due course
of time, with copycat + process().

Thanks,
Neha

On Thu, Jul 23, 2015 at 11:32 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Guozhang,

 I just took a quick look at the KIP, is it very similar to mirror maker
 with message handler?

 Thanks,

 Jiangjie (Becket) Qin

 On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  Just some notes on the KIP doc itself:
 
  * It'd be useful to clarify at what point the plain consumer + custom
 code
  + producer breaks down. I think trivial filtering and aggregation on a
  single stream usually work fine with this model. Anything where you need
  more complex joins, windowing, etc. are where it breaks down. I think
 most
  interesting applications require that functionality, but it's helpful to
  make this really clear in the motivation -- right now, Kafka only
 provides
  the lowest level plumbing for stream processing applications, so most
  interesting apps require very heavyweight frameworks.
  * I think the feature comparison of plain producer/consumer, stream
  processing frameworks, and this new library is a good start, but we might
  want something more thorough and structured, like a feature matrix. Right
  now it's hard to figure out exactly how they relate to each other.
  * I'd personally push the library vs. framework story very strongly --
 the
  total buy-in and weak integration story of stream processing frameworks
 is
  a big downside and makes a library a really compelling (and currently
  unavailable, as far as I am aware) alternative.
  * Comment about in-memory storage of other frameworks is interesting --
 it
  is specific to the framework, but is supposed to also give performance
  benefits. The high-level functional processing interface would allow for
  combining multiple operations when there's no shuffle, but when there is
 a
  shuffle, we'll always be writing to Kafka, right? Spark (and presumably
  spark streaming) is supposed to get a big win by handling shuffles such
  that the data just stays in cache and never actually hits disk, or at
 least
  hits disk in the background. Will we take a hit because we always write
 to
  Kafka?
  * I really struggled with the structure of the KIP template with Copycat
  because the flow doesn't work well for proposals like this. They aren't
 as
  concrete changes as the KIP template was designed for. I'd completely
  ignore that template in favor of optimizing for clarity if I were you.
 
  -Ewen
 
  On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi all,
  
   I just posted KIP-28: Add a transform client for data processing
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
   
   .
  
   The wiki page does not yet have the full design / implementation
 details,
   and this email is to kick-off the conversation on whether we should add
   this new client with the described motivations, and if yes what
 features
  /
   functionalities should be included.
  
   Looking forward to your 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Yi Pan
Hi, Guozhang,

Thanks for starting this. I took a quick look and had the following
thoughts to share:

- In the proposed KafkaProcessor API, there is no interface like Collector
that allows users to send messages to. Why is that? Is the idea to
initialize the producer once and re-use it in the processor? And if there
are many KStreamThreads in the process, are there going to be many
instances of KafkaProducer although all outputs are sending to the same
Kafka cluster?

- Won’t it be simpler if the process() API just takes in the ConsumerRecord
as the input instead of a tuple of (topic, key, value)?

- Also, the input only indicates the topic of a message. What if the stream
task needs to consume and produce messages from/to multiple Kafka clusters?
To support that case, there should be a system/cluster name in both input
and output as well.

- How are the output messages handled? There does not seem to have an
interface that allows user to send an output messages to multiple output
Kafka clusters.

- It seems the proposed model also assumes one thread per processor. What
becomes thread-local and what are shared among processors? Is the proposed
model targeting to have the consumers/producers become thread-local
instances within each KafkaProcessor? What’s the cost associated with this
model?

- One more important issue: how do we plug-in client-side partition
management logic? Considering about the use case where the stream task
needs to consume from multiple Kafka clusters, I am not even sure that we
can rely on Kafka broker to maintain the consumer group membership? Maybe
we still can get the per cluster consumer group membership and partitions.
However, in this case, we truly need a client-side plugin partition
management logic to determine how to assign partitions in different Kafka
clusters to consumers (i.e. consumers for cluster1.topic1.p1 and
cluster2.topic2.p1 has to be assigned together to one KafkaProcessor for
processing). Based on the full information about (group members, all topic
partitions) in all Kafka clusters with input topics, there should be two
levels of partition management policies: a) how to group all topic
partitions in all Kafka clusters to processor groups (i.e. the same concept
as Task group in Samza); b) how to assign the processor groups to group
members. Note if a processor group includes topic partitions from more than
one Kafka clusters, it has to be assigned to the common group members in
all relevant Kafka clusters. This can not be done just by the brokers in a
single Kafka cluster.

- It seems that the intention of this KIP is also trying to put SQL/DSL
libraries into Kafka. Why is it? Shouldn't Kafka be more focused on hiding
system-level integration details and leave it open for any additional
modules outside the Kafka core to enrich the functionality that are
user-facing?

Just a few quick cents. Thanks a lot!

-Yi

On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede n...@confluent.io wrote:

 Ewen:

 * I think trivial filtering and aggregation on a single stream usually work
  fine with this model.


 The way I see this, the process() API is an abstraction for
 message-at-a-time computations. In the future, you could imagine providing
 a simple DSL layer on top of the process() API that provides a set of APIs
 for stream processing operations on sets of messages like joins, windows
 and various aggregations.

 * Spark (and presumably
  spark streaming) is supposed to get a big win by handling shuffles such
  that the data just stays in cache and never actually hits disk, or at
 least
  hits disk in the background. Will we take a hit because we always write
 to
  Kafka?


 The goal isn't so much about forcing materialization of intermediate
 results into Kafka but designing the API to integrate with Kafka to allow
 such materialization, wherever that might be required. The downside with
 other stream processing frameworks is that they have weak integration with
 Kafka where interaction with Kafka is only at the endpoints of processing
 (first input, final output). Any intermediate operations that might benefit
 from persisting intermediate results into Kafka are forced to be broken up
 into 2 separate topologies/plans/stages of processing that lead to more
 jobs. The implication is that now the set of stream processing operations
 that should really have lived in one job per application is now split up
 across several piecemeal jobs that need to be monitored, managed and
 operated separately. The APIs should still allows in-memory storage of
 intermediate results where they make sense.

 Jiangjie,

 I just took a quick look at the KIP, is it very similar to mirror maker
  with message handler?


 Not really. I wouldn't say it is similar, but mirror maker is a special
 instance of using copycat with Kafka source, sink + optionally the
 process() API. I can imagine replacing the MirrorMaker, in the due course
 of time, with copycat + process().

 Thanks,
 Neha

 On Thu, 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Yasuhiro Matsuda
Jay, I understand that. Context can provide more information without
breaking the compatibility if needed. Also I am not sure ConsumerRecord is
the right abstraction of data for stream processing. After transformation
or join, what is the topic and the offset? It is odd to use ConsumerRecord.
We can define a new record class, say StreamRecord. Isn't it an unnecessary
overhead if it is created for every transformation and join?

On Fri, Jul 24, 2015 at 10:06 AM, Jay Kreps j...@confluent.io wrote:

 To follow on to one of Yi's points about taking ConsumerRecord vs
 topic/key/value. One thing we have found is that for user-facing APIs
 considering future API evolution is really important. If you do
 topic/key/value and then realize you need offset added you end up having to
 break everyones code. This is the idea behind things like ProducerRecord
 and ConsumerRecord is that you can add additional fields without breaking
 existing code. Thought I'd point that out since we've made this mistake a
 few times now.

 -Jay

 On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan nickpa...@gmail.com wrote:

  Hi, Guozhang,
 
  Thanks for starting this. I took a quick look and had the following
  thoughts to share:
 
  - In the proposed KafkaProcessor API, there is no interface like
 Collector
  that allows users to send messages to. Why is that? Is the idea to
  initialize the producer once and re-use it in the processor? And if there
  are many KStreamThreads in the process, are there going to be many
  instances of KafkaProducer although all outputs are sending to the same
  Kafka cluster?
 
  - Won’t it be simpler if the process() API just takes in the
 ConsumerRecord
  as the input instead of a tuple of (topic, key, value)?
 
  - Also, the input only indicates the topic of a message. What if the
 stream
  task needs to consume and produce messages from/to multiple Kafka
 clusters?
  To support that case, there should be a system/cluster name in both input
  and output as well.
 
  - How are the output messages handled? There does not seem to have an
  interface that allows user to send an output messages to multiple output
  Kafka clusters.
 
  - It seems the proposed model also assumes one thread per processor. What
  becomes thread-local and what are shared among processors? Is the
 proposed
  model targeting to have the consumers/producers become thread-local
  instances within each KafkaProcessor? What’s the cost associated with
 this
  model?
 
  - One more important issue: how do we plug-in client-side partition
  management logic? Considering about the use case where the stream task
  needs to consume from multiple Kafka clusters, I am not even sure that we
  can rely on Kafka broker to maintain the consumer group membership? Maybe
  we still can get the per cluster consumer group membership and
 partitions.
  However, in this case, we truly need a client-side plugin partition
  management logic to determine how to assign partitions in different Kafka
  clusters to consumers (i.e. consumers for cluster1.topic1.p1 and
  cluster2.topic2.p1 has to be assigned together to one KafkaProcessor for
  processing). Based on the full information about (group members, all
 topic
  partitions) in all Kafka clusters with input topics, there should be two
  levels of partition management policies: a) how to group all topic
  partitions in all Kafka clusters to processor groups (i.e. the same
 concept
  as Task group in Samza); b) how to assign the processor groups to group
  members. Note if a processor group includes topic partitions from more
 than
  one Kafka clusters, it has to be assigned to the common group members in
  all relevant Kafka clusters. This can not be done just by the brokers in
 a
  single Kafka cluster.
 
  - It seems that the intention of this KIP is also trying to put SQL/DSL
  libraries into Kafka. Why is it? Shouldn't Kafka be more focused on
 hiding
  system-level integration details and leave it open for any additional
  modules outside the Kafka core to enrich the functionality that are
  user-facing?
 
  Just a few quick cents. Thanks a lot!
 
  -Yi
 
  On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede n...@confluent.io
 wrote:
 
   Ewen:
  
   * I think trivial filtering and aggregation on a single stream usually
  work
fine with this model.
  
  
   The way I see this, the process() API is an abstraction for
   message-at-a-time computations. In the future, you could imagine
  providing
   a simple DSL layer on top of the process() API that provides a set of
  APIs
   for stream processing operations on sets of messages like joins,
 windows
   and various aggregations.
  
   * Spark (and presumably
spark streaming) is supposed to get a big win by handling shuffles
 such
that the data just stays in cache and never actually hits disk, or at
   least
hits disk in the background. Will we take a hit because we always
 write
   to
Kafka?
  
  
   The goal isn't so much about forcing 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Guozhang Wang
Hi Jiangjie,

Inlined.

On Thu, Jul 23, 2015 at 11:32 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Guozhang,

 I just took a quick look at the KIP, is it very similar to mirror maker
 with message handler?


I think the processor client would supporting a superset of functionalities
than MM with message handlers in that:

1. API-wise, it would provide beyond per-message processing like the
message handler, including local storage (with committing mechanism),
time-triggered processing, etc.
2. Feature-wise, it would support user-customizable partition assignment
such as co-partitioning, sticky-partitioning (for local state maintenance,
for example), etc.

Thanks,

 Jiangjie (Becket) Qin

 On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  Just some notes on the KIP doc itself:
 
  * It'd be useful to clarify at what point the plain consumer + custom
 code
  + producer breaks down. I think trivial filtering and aggregation on a
  single stream usually work fine with this model. Anything where you need
  more complex joins, windowing, etc. are where it breaks down. I think
 most
  interesting applications require that functionality, but it's helpful to
  make this really clear in the motivation -- right now, Kafka only
 provides
  the lowest level plumbing for stream processing applications, so most
  interesting apps require very heavyweight frameworks.
  * I think the feature comparison of plain producer/consumer, stream
  processing frameworks, and this new library is a good start, but we might
  want something more thorough and structured, like a feature matrix. Right
  now it's hard to figure out exactly how they relate to each other.
  * I'd personally push the library vs. framework story very strongly --
 the
  total buy-in and weak integration story of stream processing frameworks
 is
  a big downside and makes a library a really compelling (and currently
  unavailable, as far as I am aware) alternative.
  * Comment about in-memory storage of other frameworks is interesting --
 it
  is specific to the framework, but is supposed to also give performance
  benefits. The high-level functional processing interface would allow for
  combining multiple operations when there's no shuffle, but when there is
 a
  shuffle, we'll always be writing to Kafka, right? Spark (and presumably
  spark streaming) is supposed to get a big win by handling shuffles such
  that the data just stays in cache and never actually hits disk, or at
 least
  hits disk in the background. Will we take a hit because we always write
 to
  Kafka?
  * I really struggled with the structure of the KIP template with Copycat
  because the flow doesn't work well for proposals like this. They aren't
 as
  concrete changes as the KIP template was designed for. I'd completely
  ignore that template in favor of optimizing for clarity if I were you.
 
  -Ewen
 
  On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi all,
  
   I just posted KIP-28: Add a transform client for data processing
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
   
   .
  
   The wiki page does not yet have the full design / implementation
 details,
   and this email is to kick-off the conversation on whether we should add
   this new client with the described motivations, and if yes what
 features
  /
   functionalities should be included.
  
   Looking forward to your feedback!
  
   -- Guozhang
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
-- Guozhang


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Yasuhiro Matsuda
The goal of this KIP is to provide a lightweight/embeddable streaming
framework, and allows Kafka users to start using stream processing easily. DSL
is not covered in this KIP. But, DSL is a very attractive option to have.

 In the proposed KafkaProcessor API, there is no interface like Collector
that allows users to send messages to. Why is that?

It is not stated in the KIP, but Context provides a simple interface to a
producer.

Won’t it be simpler if the process() API just takes in the ConsumerRecord
as the input instead of a tuple of (topic, key, value)?

If Kafka implement a simple DSL something like the one in Spark, I think
ConsumerRecord may not be the most convenient thing for the framework or
the most intuitive thing for users. I don't think we need topic in the
arguments. Think about a most simple application, all it needs is a key and
a value. That makes the API simpler. If the application needs to access
more info (topic, offset), Context should provide them.


On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Guozhang,

 Thanks for starting this. I took a quick look and had the following
 thoughts to share:

 - In the proposed KafkaProcessor API, there is no interface like Collector
 that allows users to send messages to. Why is that? Is the idea to
 initialize the producer once and re-use it in the processor? And if there
 are many KStreamThreads in the process, are there going to be many
 instances of KafkaProducer although all outputs are sending to the same
 Kafka cluster?

 - Won’t it be simpler if the process() API just takes in the ConsumerRecord
 as the input instead of a tuple of (topic, key, value)?

 - Also, the input only indicates the topic of a message. What if the stream
 task needs to consume and produce messages from/to multiple Kafka clusters?
 To support that case, there should be a system/cluster name in both input
 and output as well.

 - How are the output messages handled? There does not seem to have an
 interface that allows user to send an output messages to multiple output
 Kafka clusters.

 - It seems the proposed model also assumes one thread per processor. What
 becomes thread-local and what are shared among processors? Is the proposed
 model targeting to have the consumers/producers become thread-local
 instances within each KafkaProcessor? What’s the cost associated with this
 model?

 - One more important issue: how do we plug-in client-side partition
 management logic? Considering about the use case where the stream task
 needs to consume from multiple Kafka clusters, I am not even sure that we
 can rely on Kafka broker to maintain the consumer group membership? Maybe
 we still can get the per cluster consumer group membership and partitions.
 However, in this case, we truly need a client-side plugin partition
 management logic to determine how to assign partitions in different Kafka
 clusters to consumers (i.e. consumers for cluster1.topic1.p1 and
 cluster2.topic2.p1 has to be assigned together to one KafkaProcessor for
 processing). Based on the full information about (group members, all topic
 partitions) in all Kafka clusters with input topics, there should be two
 levels of partition management policies: a) how to group all topic
 partitions in all Kafka clusters to processor groups (i.e. the same concept
 as Task group in Samza); b) how to assign the processor groups to group
 members. Note if a processor group includes topic partitions from more than
 one Kafka clusters, it has to be assigned to the common group members in
 all relevant Kafka clusters. This can not be done just by the brokers in a
 single Kafka cluster.

 - It seems that the intention of this KIP is also trying to put SQL/DSL
 libraries into Kafka. Why is it? Shouldn't Kafka be more focused on hiding
 system-level integration details and leave it open for any additional
 modules outside the Kafka core to enrich the functionality that are
 user-facing?

 Just a few quick cents. Thanks a lot!

 -Yi

 On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede n...@confluent.io wrote:

  Ewen:
 
  * I think trivial filtering and aggregation on a single stream usually
 work
   fine with this model.
 
 
  The way I see this, the process() API is an abstraction for
  message-at-a-time computations. In the future, you could imagine
 providing
  a simple DSL layer on top of the process() API that provides a set of
 APIs
  for stream processing operations on sets of messages like joins, windows
  and various aggregations.
 
  * Spark (and presumably
   spark streaming) is supposed to get a big win by handling shuffles such
   that the data just stays in cache and never actually hits disk, or at
  least
   hits disk in the background. Will we take a hit because we always write
  to
   Kafka?
 
 
  The goal isn't so much about forcing materialization of intermediate
  results into Kafka but designing the API to integrate with Kafka to allow
  such materialization, wherever that might 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Jay Kreps
Hey Yi,

For your other two points:

- This definitely doesn't cover any kind of SQL or anything like this.

- The prototype we started with just had process() as a method but Yasuhiro
had some ideas of adding additional filter/aggregate convenience methods.
We should discuss how this would fit with the operator work you were doing
in Samza. Probably the best way is just get the code out there in current
state and start talking about it?

- Your point about multiple clusters. We actually have a proposed extension
for the Kafka group management protocol that would allow it to cover
multiple clusters but actually I think that use case is not the focus. I
think in scope would be consuming from one cluster and producing to another.

One of the assumptions we are making is that we will split into two
categories:
a. Ingress/egress which is handled by copycat
b. Transformation which would be handled by this api

I think there are a number of motivations for this
- It is really hard to provide hard guarantees if you allow non-trivial
aggregation coupled with the ingress/egress. So if you want to be able to
do something that provides a kind of end-to-end exactly once guarantee
(that's not really the right term but what people use) I think it will be
really hard to do this across multiple systems (hello two-phase commit)
- The APIs for ingest/egress end up needing to be really different for a
first-class ingestion framework

So the case where you have data coming from many systems including many
Kafka clusters is just about how easy/hard it is to use copycat with the
transformer api in the same program. I think this is something we should
work out as part of the prototyping.

-Jay

On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Guozhang,

 Thanks for starting this. I took a quick look and had the following
 thoughts to share:

 - In the proposed KafkaProcessor API, there is no interface like Collector
 that allows users to send messages to. Why is that? Is the idea to
 initialize the producer once and re-use it in the processor? And if there
 are many KStreamThreads in the process, are there going to be many
 instances of KafkaProducer although all outputs are sending to the same
 Kafka cluster?

 - Won’t it be simpler if the process() API just takes in the ConsumerRecord
 as the input instead of a tuple of (topic, key, value)?

 - Also, the input only indicates the topic of a message. What if the stream
 task needs to consume and produce messages from/to multiple Kafka clusters?
 To support that case, there should be a system/cluster name in both input
 and output as well.

 - How are the output messages handled? There does not seem to have an
 interface that allows user to send an output messages to multiple output
 Kafka clusters.

 - It seems the proposed model also assumes one thread per processor. What
 becomes thread-local and what are shared among processors? Is the proposed
 model targeting to have the consumers/producers become thread-local
 instances within each KafkaProcessor? What’s the cost associated with this
 model?

 - One more important issue: how do we plug-in client-side partition
 management logic? Considering about the use case where the stream task
 needs to consume from multiple Kafka clusters, I am not even sure that we
 can rely on Kafka broker to maintain the consumer group membership? Maybe
 we still can get the per cluster consumer group membership and partitions.
 However, in this case, we truly need a client-side plugin partition
 management logic to determine how to assign partitions in different Kafka
 clusters to consumers (i.e. consumers for cluster1.topic1.p1 and
 cluster2.topic2.p1 has to be assigned together to one KafkaProcessor for
 processing). Based on the full information about (group members, all topic
 partitions) in all Kafka clusters with input topics, there should be two
 levels of partition management policies: a) how to group all topic
 partitions in all Kafka clusters to processor groups (i.e. the same concept
 as Task group in Samza); b) how to assign the processor groups to group
 members. Note if a processor group includes topic partitions from more than
 one Kafka clusters, it has to be assigned to the common group members in
 all relevant Kafka clusters. This can not be done just by the brokers in a
 single Kafka cluster.

 - It seems that the intention of this KIP is also trying to put SQL/DSL
 libraries into Kafka. Why is it? Shouldn't Kafka be more focused on hiding
 system-level integration details and leave it open for any additional
 modules outside the Kafka core to enrich the functionality that are
 user-facing?

 Just a few quick cents. Thanks a lot!

 -Yi

 On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede n...@confluent.io wrote:

  Ewen:
 
  * I think trivial filtering and aggregation on a single stream usually
 work
   fine with this model.
 
 
  The way I see this, the process() API is an abstraction for
  

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Guozhang Wang
Hi Yi,

Inlined.

On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Guozhang,

 Thanks for starting this. I took a quick look and had the following
 thoughts to share:

 - In the proposed KafkaProcessor API, there is no interface like Collector
 that allows users to send messages to. Why is that? Is the idea to
 initialize the producer once and re-use it in the processor? And if there
 are many KStreamThreads in the process, are there going to be many
 instances of KafkaProducer although all outputs are sending to the same
 Kafka cluster?


The API mentioned in the wiki page is neither final nor comprehensive, it
is just for illustrating its usage for replacing the producer + consumer
APIs. I will try to add the first draft of the full APIs to the wiki page
later, together with a prototype implementation of such APIs. And yes, the
send() function should definitely be supported.

Regarding whether we should have multiple producer / consumer instances or
not within a single processor instance, as mentioned in the wiki it is not
fully decided yet and again, I would like to add such content to the KIP
proposal once we have a prototype illustrating the design so that people
can have a better idea and discuss over it.


 - Won’t it be simpler if the process() API just takes in the ConsumerRecord
 as the input instead of a tuple of (topic, key, value)?


I think I agree.


 - Also, the input only indicates the topic of a message. What if the stream
 task needs to consume and produce messages from/to multiple Kafka clusters?
 To support that case, there should be a system/cluster name in both input
 and output as well.


Yeah that is a good question, I agree that in the final state the processor
context should include such record metadata as well as partition-id,
offset, etc.


 - How are the output messages handled? There does not seem to have an
 interface that allows user to send an output messages to multiple output
 Kafka clusters.


See above.


 - It seems the proposed model also assumes one thread per processor. What
 becomes thread-local and what are shared among processors? Is the proposed
 model targeting to have the consumers/producers become thread-local
 instances within each KafkaProcessor? What’s the cost associated with this
 model?


We do not assume one thread per processor, I think the name of
KStreamThread would be a bit misleading here: we can definitely spawn more
threads within this main thread of the process if we decided to do so in
the system design.


 - One more important issue: how do we plug-in client-side partition
 management logic? Considering about the use case where the stream task
 needs to consume from multiple Kafka clusters, I am not even sure that we
 can rely on Kafka broker to maintain the consumer group membership? Maybe
 we still can get the per cluster consumer group membership and partitions.
 However, in this case, we truly need a client-side plugin partition
 management logic to determine how to assign partitions in different Kafka
 clusters to consumers (i.e. consumers for cluster1.topic1.p1 and
 cluster2.topic2.p1 has to be assigned together to one KafkaProcessor for
 processing). Based on the full information about (group members, all topic
 partitions) in all Kafka clusters with input topics, there should be two
 levels of partition management policies: a) how to group all topic
 partitions in all Kafka clusters to processor groups (i.e. the same concept
 as Task group in Samza); b) how to assign the processor groups to group
 members. Note if a processor group includes topic partitions from more than
 one Kafka clusters, it has to be assigned to the common group members in
 all relevant Kafka clusters. This can not be done just by the brokers in a
 single Kafka cluster.


Yes, the current broker-side partition assignment is not flexible enough
and we are considering to change the protocol so that to allow clients to
assign partitions themselves; and depending on the system design we may
handle the two-level assignment differently, for example, if we use one
consumer for each incoming Kafka cluster within the process instance, and
have multiple processor threads that reads the data from the shared
consumer, then we besides consumer-group partition assignment we also need
to determine the allocation of partitions into the processor threads.


 - It seems that the intention of this KIP is also trying to put SQL/DSL
 libraries into Kafka. Why is it? Shouldn't Kafka be more focused on hiding
 system-level integration details and leave it open for any additional
 modules outside the Kafka core to enrich the functionality that are
 user-facing?


I was not trying to push SQL / DSL into Kafka, but I do want to bring this
up for discussion as what features should be included in this client
library.

Just a few quick cents. Thanks a lot!

 -Yi

 On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede n...@confluent.io wrote:

  Ewen:
 
  * I think 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Ewen Cheslack-Postava
@guozhang re: library vs framework - Nothing critical missing. But I think
KIPs will serve as a sort of changelog since ones like this
https://archive.apache.org/dist/kafka/0.8.2.0/RELEASE_NOTES.html are not
all that helpful to end users. A KIP like this is a major new addition, so
laying out all the benefits and tradeoffs wrt frameworks is important for
users. Docs will cover that too, but Kafka tends to lag on documentation
updates and at worst, the docs are just bootstrapped from very clear KIP
text.

In terms of concrete changes, I'd love to see a few examples of how you
would take advantage of it being in library form. There's the obvious
benefit that you can easily integrate with whatever cluster/process
management you want instead of requiring YARN/Mesos/etc, but having a
library rather than framework also allows you to integrate with other code
in the same process. The Copycat KIP showed how you might integrate Copycat
and a stream processing library very generically, but maybe one or two real
examples would be useful? What else is running in the process besides
stream processing?

All my comments were only meant as suggestions for how the KIP document
might be improved. I'm already convinced that stream processing frameworks
today are far too heavyweight and there's a gap to be filled between those
frameworks and using producer/consumer directly.

-Ewen

On Fri, Jul 24, 2015 at 9:48 AM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Ewen,

 Replies inlined.

 On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  Just some notes on the KIP doc itself:
 
  * It'd be useful to clarify at what point the plain consumer + custom
 code
  + producer breaks down. I think trivial filtering and aggregation on a
  single stream usually work fine with this model. Anything where you need
  more complex joins, windowing, etc. are where it breaks down. I think
 most
  interesting applications require that functionality, but it's helpful to
  make this really clear in the motivation -- right now, Kafka only
 provides
  the lowest level plumbing for stream processing applications, so most
  interesting apps require very heavyweight frameworks.
 

 I think for users to efficiently express complex logic like joins
 windowing, etc, a higher-level programming interface beyond the process()
 interface would definitely be better, but that does not necessarily require
 a heavyweight frameworks, which usually includes more than just the
 high-level functional programming model. I would argue that an alternative
 solution would better be provided for users who want some high-level
 programming interface but not a heavyweight stream processing framework
 that include the processor library plus another DSL layer on top of it.



  * I think the feature comparison of plain producer/consumer, stream
  processing frameworks, and this new library is a good start, but we might
  want something more thorough and structured, like a feature matrix. Right
  now it's hard to figure out exactly how they relate to each other.
 

 Cool, I can do that.


  * I'd personally push the library vs. framework story very strongly --
 the
  total buy-in and weak integration story of stream processing frameworks
 is
  a big downside and makes a library a really compelling (and currently
  unavailable, as far as I am aware) alternative.
 

 Are you suggesting there are still some content missing about the
 motivations of adding the proposed library in the wiki page?


  * Comment about in-memory storage of other frameworks is interesting --
 it
  is specific to the framework, but is supposed to also give performance
  benefits. The high-level functional processing interface would allow for
  combining multiple operations when there's no shuffle, but when there is
 a
  shuffle, we'll always be writing to Kafka, right? Spark (and presumably
  spark streaming) is supposed to get a big win by handling shuffles such
  that the data just stays in cache and never actually hits disk, or at
 least
  hits disk in the background. Will we take a hit because we always write
 to
  Kafka?
 

 I agree with Neha's comments here. One more point I want to make is
 materializing to Kafka is not necessarily much worse than keeping data in
 memory if the downstream consumption is caught up such that most of the
 reads will be hitting file cache. I remember Samza has illustrated that
 under such scenarios its throughput is actually quite comparable to Spark
 Streaming / Storm.


  * I really struggled with the structure of the KIP template with Copycat
  because the flow doesn't work well for proposals like this. They aren't
 as
  concrete changes as the KIP template was designed for. I'd completely
  ignore that template in favor of optimizing for clarity if I were you.
 
  -Ewen
 
  On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi all,
  
   I just posted KIP-28: Add a transform client for data processing
 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Guozhang Wang
Hi Ewen,

Replies inlined.

On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 Just some notes on the KIP doc itself:

 * It'd be useful to clarify at what point the plain consumer + custom code
 + producer breaks down. I think trivial filtering and aggregation on a
 single stream usually work fine with this model. Anything where you need
 more complex joins, windowing, etc. are where it breaks down. I think most
 interesting applications require that functionality, but it's helpful to
 make this really clear in the motivation -- right now, Kafka only provides
 the lowest level plumbing for stream processing applications, so most
 interesting apps require very heavyweight frameworks.


I think for users to efficiently express complex logic like joins
windowing, etc, a higher-level programming interface beyond the process()
interface would definitely be better, but that does not necessarily require
a heavyweight frameworks, which usually includes more than just the
high-level functional programming model. I would argue that an alternative
solution would better be provided for users who want some high-level
programming interface but not a heavyweight stream processing framework
that include the processor library plus another DSL layer on top of it.



 * I think the feature comparison of plain producer/consumer, stream
 processing frameworks, and this new library is a good start, but we might
 want something more thorough and structured, like a feature matrix. Right
 now it's hard to figure out exactly how they relate to each other.


Cool, I can do that.


 * I'd personally push the library vs. framework story very strongly -- the
 total buy-in and weak integration story of stream processing frameworks is
 a big downside and makes a library a really compelling (and currently
 unavailable, as far as I am aware) alternative.


Are you suggesting there are still some content missing about the
motivations of adding the proposed library in the wiki page?


 * Comment about in-memory storage of other frameworks is interesting -- it
 is specific to the framework, but is supposed to also give performance
 benefits. The high-level functional processing interface would allow for
 combining multiple operations when there's no shuffle, but when there is a
 shuffle, we'll always be writing to Kafka, right? Spark (and presumably
 spark streaming) is supposed to get a big win by handling shuffles such
 that the data just stays in cache and never actually hits disk, or at least
 hits disk in the background. Will we take a hit because we always write to
 Kafka?


I agree with Neha's comments here. One more point I want to make is
materializing to Kafka is not necessarily much worse than keeping data in
memory if the downstream consumption is caught up such that most of the
reads will be hitting file cache. I remember Samza has illustrated that
under such scenarios its throughput is actually quite comparable to Spark
Streaming / Storm.


 * I really struggled with the structure of the KIP template with Copycat
 because the flow doesn't work well for proposals like this. They aren't as
 concrete changes as the KIP template was designed for. I'd completely
 ignore that template in favor of optimizing for clarity if I were you.

 -Ewen

 On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi all,
 
  I just posted KIP-28: Add a transform client for data processing
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
  
  .
 
  The wiki page does not yet have the full design / implementation details,
  and this email is to kick-off the conversation on whether we should add
  this new client with the described motivations, and if yes what features
 /
  functionalities should be included.
 
  Looking forward to your feedback!
 
  -- Guozhang
 



 --
 Thanks,
 Ewen




-- 
-- Guozhang


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Neha Narkhede
Agree that the normal KIP process is awkward for larger changes like this.
I'm a +1 on trying out this new process for the processor client, see how
it works out and then make that a process for future large changes of this
nature.

On Fri, Jul 24, 2015 at 10:03 AM, Jay Kreps j...@confluent.io wrote:

 I agree that the KIP process doesn't fit well for big areas of development
 like the new consumer, copycat, or this.

 I think the approach for copycat where we do a should this exist KIP vote
 followed by a review on code checkin isn't ideal because of course the
 question of should we do it is directly tied to the question of what
 will it look like. I'm sure any of us could either be in favor or opposed
 to copycat depending on the details of what it looks like. And for these
 big things you really need to have a fairly complete prototype to get into
 details of how it will work. But we definitely want to do these kind of
 things collaboratively so we don't want to wait until we have a finished
 prototype and then dump out the code and KIP in final form. My experience
 is that it is pretty hard to influence things that are this far along
 because by then all the ideas have kind of solidified in the authors'
 minds.

 So I think the proposal for this one is to try the follow:
 1. Throw out a stub KIP with essentially no concrete design other than a
 problem statement and niche we are trying to address. Start discussion on
 this but no vote (because what are you really voting on?).
 2. Get a WIP prototype patch out there quickly and discuss that as it is
 being developed and refined.
 3. Solidify the prototype patch and KIP together and do a vote on the KIP
 as the final design solidifies.
 4. Do the normal review process for the patch more or less decoupled from
 the KIP discussion covering implementation rather than design and user apis
 (which the KIP discussion would cover).

 Does this make sense to people? If so let's try it and if we like it better
 we can formally make that the process for this kind of big thing.

 -Jay



 On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  Just some notes on the KIP doc itself:
 
  * It'd be useful to clarify at what point the plain consumer + custom
 code
  + producer breaks down. I think trivial filtering and aggregation on a
  single stream usually work fine with this model. Anything where you need
  more complex joins, windowing, etc. are where it breaks down. I think
 most
  interesting applications require that functionality, but it's helpful to
  make this really clear in the motivation -- right now, Kafka only
 provides
  the lowest level plumbing for stream processing applications, so most
  interesting apps require very heavyweight frameworks.
  * I think the feature comparison of plain producer/consumer, stream
  processing frameworks, and this new library is a good start, but we might
  want something more thorough and structured, like a feature matrix. Right
  now it's hard to figure out exactly how they relate to each other.
  * I'd personally push the library vs. framework story very strongly --
 the
  total buy-in and weak integration story of stream processing frameworks
 is
  a big downside and makes a library a really compelling (and currently
  unavailable, as far as I am aware) alternative.
  * Comment about in-memory storage of other frameworks is interesting --
 it
  is specific to the framework, but is supposed to also give performance
  benefits. The high-level functional processing interface would allow for
  combining multiple operations when there's no shuffle, but when there is
 a
  shuffle, we'll always be writing to Kafka, right? Spark (and presumably
  spark streaming) is supposed to get a big win by handling shuffles such
  that the data just stays in cache and never actually hits disk, or at
 least
  hits disk in the background. Will we take a hit because we always write
 to
  Kafka?
  * I really struggled with the structure of the KIP template with Copycat
  because the flow doesn't work well for proposals like this. They aren't
 as
  concrete changes as the KIP template was designed for. I'd completely
  ignore that template in favor of optimizing for clarity if I were you.
 
  -Ewen
 
  On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi all,
  
   I just posted KIP-28: Add a transform client for data processing
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
   
   .
  
   The wiki page does not yet have the full design / implementation
 details,
   and this email is to kick-off the conversation on whether we should add
   this new client with the described motivations, and if yes what
 features
  /
   functionalities should be included.
  
   Looking forward to your feedback!
  
   -- Guozhang
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Neha


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Jay Kreps
To follow on to one of Yi's points about taking ConsumerRecord vs
topic/key/value. One thing we have found is that for user-facing APIs
considering future API evolution is really important. If you do
topic/key/value and then realize you need offset added you end up having to
break everyones code. This is the idea behind things like ProducerRecord
and ConsumerRecord is that you can add additional fields without breaking
existing code. Thought I'd point that out since we've made this mistake a
few times now.

-Jay

On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Guozhang,

 Thanks for starting this. I took a quick look and had the following
 thoughts to share:

 - In the proposed KafkaProcessor API, there is no interface like Collector
 that allows users to send messages to. Why is that? Is the idea to
 initialize the producer once and re-use it in the processor? And if there
 are many KStreamThreads in the process, are there going to be many
 instances of KafkaProducer although all outputs are sending to the same
 Kafka cluster?

 - Won’t it be simpler if the process() API just takes in the ConsumerRecord
 as the input instead of a tuple of (topic, key, value)?

 - Also, the input only indicates the topic of a message. What if the stream
 task needs to consume and produce messages from/to multiple Kafka clusters?
 To support that case, there should be a system/cluster name in both input
 and output as well.

 - How are the output messages handled? There does not seem to have an
 interface that allows user to send an output messages to multiple output
 Kafka clusters.

 - It seems the proposed model also assumes one thread per processor. What
 becomes thread-local and what are shared among processors? Is the proposed
 model targeting to have the consumers/producers become thread-local
 instances within each KafkaProcessor? What’s the cost associated with this
 model?

 - One more important issue: how do we plug-in client-side partition
 management logic? Considering about the use case where the stream task
 needs to consume from multiple Kafka clusters, I am not even sure that we
 can rely on Kafka broker to maintain the consumer group membership? Maybe
 we still can get the per cluster consumer group membership and partitions.
 However, in this case, we truly need a client-side plugin partition
 management logic to determine how to assign partitions in different Kafka
 clusters to consumers (i.e. consumers for cluster1.topic1.p1 and
 cluster2.topic2.p1 has to be assigned together to one KafkaProcessor for
 processing). Based on the full information about (group members, all topic
 partitions) in all Kafka clusters with input topics, there should be two
 levels of partition management policies: a) how to group all topic
 partitions in all Kafka clusters to processor groups (i.e. the same concept
 as Task group in Samza); b) how to assign the processor groups to group
 members. Note if a processor group includes topic partitions from more than
 one Kafka clusters, it has to be assigned to the common group members in
 all relevant Kafka clusters. This can not be done just by the brokers in a
 single Kafka cluster.

 - It seems that the intention of this KIP is also trying to put SQL/DSL
 libraries into Kafka. Why is it? Shouldn't Kafka be more focused on hiding
 system-level integration details and leave it open for any additional
 modules outside the Kafka core to enrich the functionality that are
 user-facing?

 Just a few quick cents. Thanks a lot!

 -Yi

 On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede n...@confluent.io wrote:

  Ewen:
 
  * I think trivial filtering and aggregation on a single stream usually
 work
   fine with this model.
 
 
  The way I see this, the process() API is an abstraction for
  message-at-a-time computations. In the future, you could imagine
 providing
  a simple DSL layer on top of the process() API that provides a set of
 APIs
  for stream processing operations on sets of messages like joins, windows
  and various aggregations.
 
  * Spark (and presumably
   spark streaming) is supposed to get a big win by handling shuffles such
   that the data just stays in cache and never actually hits disk, or at
  least
   hits disk in the background. Will we take a hit because we always write
  to
   Kafka?
 
 
  The goal isn't so much about forcing materialization of intermediate
  results into Kafka but designing the API to integrate with Kafka to allow
  such materialization, wherever that might be required. The downside with
  other stream processing frameworks is that they have weak integration
 with
  Kafka where interaction with Kafka is only at the endpoints of processing
  (first input, final output). Any intermediate operations that might
 benefit
  from persisting intermediate results into Kafka are forced to be broken
 up
  into 2 separate topologies/plans/stages of processing that lead to more
  jobs. The implication is that now the set of stream 

Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-24 Thread Ismael Juma
On 24 Jul 2015 18:03, Jay Kreps j...@confluent.io wrote:
 Does this make sense to people? If so let's try it and if we like it
better
 we can formally make that the process for this kind of big thing.

Yes, sounds good to me.

Best,
Ismael


Re: [DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-23 Thread Ewen Cheslack-Postava
Just some notes on the KIP doc itself:

* It'd be useful to clarify at what point the plain consumer + custom code
+ producer breaks down. I think trivial filtering and aggregation on a
single stream usually work fine with this model. Anything where you need
more complex joins, windowing, etc. are where it breaks down. I think most
interesting applications require that functionality, but it's helpful to
make this really clear in the motivation -- right now, Kafka only provides
the lowest level plumbing for stream processing applications, so most
interesting apps require very heavyweight frameworks.
* I think the feature comparison of plain producer/consumer, stream
processing frameworks, and this new library is a good start, but we might
want something more thorough and structured, like a feature matrix. Right
now it's hard to figure out exactly how they relate to each other.
* I'd personally push the library vs. framework story very strongly -- the
total buy-in and weak integration story of stream processing frameworks is
a big downside and makes a library a really compelling (and currently
unavailable, as far as I am aware) alternative.
* Comment about in-memory storage of other frameworks is interesting -- it
is specific to the framework, but is supposed to also give performance
benefits. The high-level functional processing interface would allow for
combining multiple operations when there's no shuffle, but when there is a
shuffle, we'll always be writing to Kafka, right? Spark (and presumably
spark streaming) is supposed to get a big win by handling shuffles such
that the data just stays in cache and never actually hits disk, or at least
hits disk in the background. Will we take a hit because we always write to
Kafka?
* I really struggled with the structure of the KIP template with Copycat
because the flow doesn't work well for proposals like this. They aren't as
concrete changes as the KIP template was designed for. I'd completely
ignore that template in favor of optimizing for clarity if I were you.

-Ewen

On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi all,

 I just posted KIP-28: Add a transform client for data processing
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
 
 .

 The wiki page does not yet have the full design / implementation details,
 and this email is to kick-off the conversation on whether we should add
 this new client with the described motivations, and if yes what features /
 functionalities should be included.

 Looking forward to your feedback!

 -- Guozhang




-- 
Thanks,
Ewen


[DISCUSS] KIP-28 - Add a transform client for data processing

2015-07-23 Thread Guozhang Wang
Hi all,

I just posted KIP-28: Add a transform client for data processing
https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
.

The wiki page does not yet have the full design / implementation details,
and this email is to kick-off the conversation on whether we should add
this new client with the described motivations, and if yes what features /
functionalities should be included.

Looking forward to your feedback!

-- Guozhang