Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2021-06-16 Thread John Roesler
Hello again, all,

Since it would be confusing to continue indefinitely with the old
and now PAPIs both not deprecated, I have decided to go
ahead with deprecating the old PAPI in AK 3.0.

Since KAFKA-10603 has not seen any progress, this means
that we actually do have to go ahead and
deprecate+replace the KStream#process API, so I have
updated the KIP accordingly:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=19=18

I am implementing this proposal as part of my PR
to deprecate the old PAPI:
https://github.com/apache/kafka/pull/10869

Please let me know if you have any concerns!

Thank you,
-John

On Mon, Oct 12, 2020, at 22:18, John Roesler wrote:
> Hello all,
> 
> While reviewing the KIP document, I realized that I hadn't
> submitted a PR to migrate the KStream.process() method to
> the new API. Upon reflection, I think I'd better defer this
> work for the same reason I deferred all the transform()
> APIs. I believe that the new Processor interface will give
> us the opportunity to collapse all those operations into
> one.
> 
> Doing this work now will not only violate the 2.7 code
> freeze, but it will also take away some of our freedom when
> we take on that future work, since we wouldn't be able to
> re-use the "process" name if we chose.
> 
> Accordingly, I've edited the KIP document to say that we
> will _not_ change that method for now, but leave it for
> future work. I also filed:
> https://issues.apache.org/jira/browse/KAFKA-10603
> 
> Thanks,
> -John
> 
> On Thu, 2020-10-01 at 17:08 -0700, Matthias J. Sax wrote:
> > Thanks John.
> > 
> > SGTM.
> > 
> > On 10/1/20 2:50 PM, John Roesler wrote:
> > > Hello again, all,
> > > 
> > > I'm sorry to make another tweak to this KIP, but during the
> > > implementation of the design we've just agreed on, I
> > > realized that Processors would almost never need to
> > > reference the RecordMetadata. Therefore, I'm proposing to
> > > streamline the API by moving the Optional to
> > > the new ProcessorContext as a method, rather than making it
> > > a method argument to Processor#process.
> > > 
> > > The change is visible here:
> > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=16=15
> > > 
> > > All of the same semantics and considerations we discussed
> > > still apply, it's just that Processor implementers won't
> > > have to think about it unless they actually _need_ the
> > > topic/partition/offset information from the RecordMetadata.
> > > 
> > > Also, the PR for this part of the KIP is now available here:
> > > https://github.com/apache/kafka/pull/9361
> > > 
> > > I know it's a bit on the heavy side; I've annotated the PR
> > > to try and ease the reviewer's job. I'd greatly appreciate
> > > it if anyone can take the time to review.
> > > 
> > > Thanks,
> > > -John
> > > 
> > > On Wed, 2020-09-30 at 10:16 -0500, John Roesler wrote:
> > > > Thanks, Matthias!
> > > > 
> > > > I can certainly document it. I didn't bother because the old
> > > > Processor, Supplier, and Context will themselves be
> > > > deprecated, so any method that handles them won't be able to
> > > > avoid the deprecation warning. Nevertheless, it doesn't hurt
> > > > just to explicitly deprecated those methods.
> > > > 
> > > > Thanks,
> > > > -John
> > > > 
> > > > On Wed, 2020-09-30 at 08:10 -0700, Matthias J. Sax wrote:
> > > > > Thanks John. I like the proposal.
> > > > > 
> > > > > Btw: I was just going over the KIP and realized that we add new 
> > > > > methods
> > > > > to `StreamBuilder`, `Topology`, and `KStream` that take the new
> > > > > `ProcessorSupplier` class -- should we also deprecate the 
> > > > > corresponding
> > > > > existing ones that take the old `ProcessorSupplier`?
> > > > > 
> > > > > 
> > > > > -Matthias
> > > > > 
> > > > > 
> > > > > On 9/30/20 7:46 AM, John Roesler wrote:
> > > > > > Thanks Paul and Sophie,
> > > > > > 
> > > > > > Your feedback certainly underscores the need to be explicit
> > > > > > in the javadoc about why that parameter is Optional. Getting
> > > > > > this kind of feedback before the release is exactly the kind
> > > > > > of outcome we hope to get from the KIP process!
> > > > > > 
> > > > > > Thanks,
> > > > > > -John
> > > > > > 
> > > > > > On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
> > > > > > > John, I totally agree that adding a method to Processor is 
> > > > > > > cumbersome and
> > > > > > > not a good path.  I was imagining maybe a separate interface that 
> > > > > > > could be
> > > > > > > used in the appropriate context, but I don't think that makes too 
> > > > > > > much
> > > > > > > sense - it's just too far away from what Kafka Streams is.  I was
> > > > > > > originally more interested in the "why" Optional than the "how" 
> > > > > > > (I think my
> > > > > > > original reply overplayed the "optional as an argument" concern). 
> > > > > > >  But
> > > > > > > you've convinced me that there is a perfectly legitimate 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-10-12 Thread John Roesler
Hello all,

While reviewing the KIP document, I realized that I hadn't
submitted a PR to migrate the KStream.process() method to
the new API. Upon reflection, I think I'd better defer this
work for the same reason I deferred all the transform()
APIs. I believe that the new Processor interface will give
us the opportunity to collapse all those operations into
one.

Doing this work now will not only violate the 2.7 code
freeze, but it will also take away some of our freedom when
we take on that future work, since we wouldn't be able to
re-use the "process" name if we chose.

Accordingly, I've edited the KIP document to say that we
will _not_ change that method for now, but leave it for
future work. I also filed:
https://issues.apache.org/jira/browse/KAFKA-10603

Thanks,
-John

On Thu, 2020-10-01 at 17:08 -0700, Matthias J. Sax wrote:
> Thanks John.
> 
> SGTM.
> 
> On 10/1/20 2:50 PM, John Roesler wrote:
> > Hello again, all,
> > 
> > I'm sorry to make another tweak to this KIP, but during the
> > implementation of the design we've just agreed on, I
> > realized that Processors would almost never need to
> > reference the RecordMetadata. Therefore, I'm proposing to
> > streamline the API by moving the Optional to
> > the new ProcessorContext as a method, rather than making it
> > a method argument to Processor#process.
> > 
> > The change is visible here:
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=16=15
> > 
> > All of the same semantics and considerations we discussed
> > still apply, it's just that Processor implementers won't
> > have to think about it unless they actually _need_ the
> > topic/partition/offset information from the RecordMetadata.
> > 
> > Also, the PR for this part of the KIP is now available here:
> > https://github.com/apache/kafka/pull/9361
> > 
> > I know it's a bit on the heavy side; I've annotated the PR
> > to try and ease the reviewer's job. I'd greatly appreciate
> > it if anyone can take the time to review.
> > 
> > Thanks,
> > -John
> > 
> > On Wed, 2020-09-30 at 10:16 -0500, John Roesler wrote:
> > > Thanks, Matthias!
> > > 
> > > I can certainly document it. I didn't bother because the old
> > > Processor, Supplier, and Context will themselves be
> > > deprecated, so any method that handles them won't be able to
> > > avoid the deprecation warning. Nevertheless, it doesn't hurt
> > > just to explicitly deprecated those methods.
> > > 
> > > Thanks,
> > > -John
> > > 
> > > On Wed, 2020-09-30 at 08:10 -0700, Matthias J. Sax wrote:
> > > > Thanks John. I like the proposal.
> > > > 
> > > > Btw: I was just going over the KIP and realized that we add new methods
> > > > to `StreamBuilder`, `Topology`, and `KStream` that take the new
> > > > `ProcessorSupplier` class -- should we also deprecate the corresponding
> > > > existing ones that take the old `ProcessorSupplier`?
> > > > 
> > > > 
> > > > -Matthias
> > > > 
> > > > 
> > > > On 9/30/20 7:46 AM, John Roesler wrote:
> > > > > Thanks Paul and Sophie,
> > > > > 
> > > > > Your feedback certainly underscores the need to be explicit
> > > > > in the javadoc about why that parameter is Optional. Getting
> > > > > this kind of feedback before the release is exactly the kind
> > > > > of outcome we hope to get from the KIP process!
> > > > > 
> > > > > Thanks,
> > > > > -John
> > > > > 
> > > > > On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
> > > > > > John, I totally agree that adding a method to Processor is 
> > > > > > cumbersome and
> > > > > > not a good path.  I was imagining maybe a separate interface that 
> > > > > > could be
> > > > > > used in the appropriate context, but I don't think that makes too 
> > > > > > much
> > > > > > sense - it's just too far away from what Kafka Streams is.  I was
> > > > > > originally more interested in the "why" Optional than the "how" (I 
> > > > > > think my
> > > > > > original reply overplayed the "optional as an argument" concern).  
> > > > > > But
> > > > > > you've convinced me that there is a perfectly legitimate "why".  We 
> > > > > > should
> > > > > > make sure that it's clear why it's Optional, but I suppose that goes
> > > > > > without saying.  It's a nice opportunity to make the API reflect 
> > > > > > more what
> > > > > > is actually going on under the hood.
> > > > > > 
> > > > > > Thanks!
> > > > > > Paul
> > > > > > 
> > > > > > On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman 
> > > > > > 
> > > > > > wrote:
> > > > > > 
> > > > > > > FWIW, while I'm really not a fan of Optional in general, I agree 
> > > > > > > that its
> > > > > > > usage
> > > > > > > here seems appropriate. Even for those rare software developers 
> > > > > > > who
> > > > > > > carefully
> > > > > > > read all the docs several times over, I think it wouldn't be too 
> > > > > > > hard to
> > > > > > > miss a
> > > > > > > note about the RecordMetadata possibly being null.
> > > > > > > 
> > > > > > > Especially because it's not that 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-10-01 Thread Matthias J. Sax
Thanks John.

SGTM.

On 10/1/20 2:50 PM, John Roesler wrote:
> Hello again, all,
> 
> I'm sorry to make another tweak to this KIP, but during the
> implementation of the design we've just agreed on, I
> realized that Processors would almost never need to
> reference the RecordMetadata. Therefore, I'm proposing to
> streamline the API by moving the Optional to
> the new ProcessorContext as a method, rather than making it
> a method argument to Processor#process.
> 
> The change is visible here:
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=16=15
> 
> All of the same semantics and considerations we discussed
> still apply, it's just that Processor implementers won't
> have to think about it unless they actually _need_ the
> topic/partition/offset information from the RecordMetadata.
> 
> Also, the PR for this part of the KIP is now available here:
> https://github.com/apache/kafka/pull/9361
> 
> I know it's a bit on the heavy side; I've annotated the PR
> to try and ease the reviewer's job. I'd greatly appreciate
> it if anyone can take the time to review.
> 
> Thanks,
> -John
> 
> On Wed, 2020-09-30 at 10:16 -0500, John Roesler wrote:
>> Thanks, Matthias!
>>
>> I can certainly document it. I didn't bother because the old
>> Processor, Supplier, and Context will themselves be
>> deprecated, so any method that handles them won't be able to
>> avoid the deprecation warning. Nevertheless, it doesn't hurt
>> just to explicitly deprecated those methods.
>>
>> Thanks,
>> -John
>>
>> On Wed, 2020-09-30 at 08:10 -0700, Matthias J. Sax wrote:
>>> Thanks John. I like the proposal.
>>>
>>> Btw: I was just going over the KIP and realized that we add new methods
>>> to `StreamBuilder`, `Topology`, and `KStream` that take the new
>>> `ProcessorSupplier` class -- should we also deprecate the corresponding
>>> existing ones that take the old `ProcessorSupplier`?
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 9/30/20 7:46 AM, John Roesler wrote:
 Thanks Paul and Sophie,

 Your feedback certainly underscores the need to be explicit
 in the javadoc about why that parameter is Optional. Getting
 this kind of feedback before the release is exactly the kind
 of outcome we hope to get from the KIP process!

 Thanks,
 -John

 On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
> John, I totally agree that adding a method to Processor is cumbersome and
> not a good path.  I was imagining maybe a separate interface that could be
> used in the appropriate context, but I don't think that makes too much
> sense - it's just too far away from what Kafka Streams is.  I was
> originally more interested in the "why" Optional than the "how" (I think 
> my
> original reply overplayed the "optional as an argument" concern).  But
> you've convinced me that there is a perfectly legitimate "why".  We should
> make sure that it's clear why it's Optional, but I suppose that goes
> without saying.  It's a nice opportunity to make the API reflect more what
> is actually going on under the hood.
>
> Thanks!
> Paul
>
> On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman 
> wrote:
>
>> FWIW, while I'm really not a fan of Optional in general, I agree that its
>> usage
>> here seems appropriate. Even for those rare software developers who
>> carefully
>> read all the docs several times over, I think it wouldn't be too hard to
>> miss a
>> note about the RecordMetadata possibly being null.
>>
>> Especially because it's not that obvious why at first glance, and takes a
>> bit of
>> thinking to realize that records originating from a Punctuator wouldn't
>> have a
>> "current record". This  is something that has definitely confused users
>> today.
>>
>> It's on us to improve the education here -- and an 
>> Optional
>> would naturally raise awareness of this subtlety
>>
>> On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
>> wrote:
>>
>>> Does my reply address your concerns?
>>>
>>>
>>> Yes; also, I definitely misread part of the proposal earlier and thought
>>> you had put
>>> the timestamp field in RecordMetadata. Sorry for not giving things a
>>> closer look
>>> before responding! I'm not sure my original message made much sense 
>>> given
>>> the misunderstanding, but thanks for responding anyway :P
>>>
>>> Having given the proposal a second pass, I agree, it's very elegant. +1
>>>
>>> On Tue, Sep 29, 2020 at 6:50 PM John Roesler 
>> wrote:
 Thanks for the reply, Sophie,

 I think I may have summarized too much in my prior reply.

 In the currently proposed KIP, any caller of forward() must
 supply a Record, which consists of:
 * key
 * value
 * timestamp
 * headers (with a 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-10-01 Thread John Roesler
Hello again, all,

I'm sorry to make another tweak to this KIP, but during the
implementation of the design we've just agreed on, I
realized that Processors would almost never need to
reference the RecordMetadata. Therefore, I'm proposing to
streamline the API by moving the Optional to
the new ProcessorContext as a method, rather than making it
a method argument to Processor#process.

The change is visible here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=16=15

All of the same semantics and considerations we discussed
still apply, it's just that Processor implementers won't
have to think about it unless they actually _need_ the
topic/partition/offset information from the RecordMetadata.

Also, the PR for this part of the KIP is now available here:
https://github.com/apache/kafka/pull/9361

I know it's a bit on the heavy side; I've annotated the PR
to try and ease the reviewer's job. I'd greatly appreciate
it if anyone can take the time to review.

Thanks,
-John

On Wed, 2020-09-30 at 10:16 -0500, John Roesler wrote:
> Thanks, Matthias!
> 
> I can certainly document it. I didn't bother because the old
> Processor, Supplier, and Context will themselves be
> deprecated, so any method that handles them won't be able to
> avoid the deprecation warning. Nevertheless, it doesn't hurt
> just to explicitly deprecated those methods.
> 
> Thanks,
> -John
> 
> On Wed, 2020-09-30 at 08:10 -0700, Matthias J. Sax wrote:
> > Thanks John. I like the proposal.
> > 
> > Btw: I was just going over the KIP and realized that we add new methods
> > to `StreamBuilder`, `Topology`, and `KStream` that take the new
> > `ProcessorSupplier` class -- should we also deprecate the corresponding
> > existing ones that take the old `ProcessorSupplier`?
> > 
> > 
> > -Matthias
> > 
> > 
> > On 9/30/20 7:46 AM, John Roesler wrote:
> > > Thanks Paul and Sophie,
> > > 
> > > Your feedback certainly underscores the need to be explicit
> > > in the javadoc about why that parameter is Optional. Getting
> > > this kind of feedback before the release is exactly the kind
> > > of outcome we hope to get from the KIP process!
> > > 
> > > Thanks,
> > > -John
> > > 
> > > On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
> > > > John, I totally agree that adding a method to Processor is cumbersome 
> > > > and
> > > > not a good path.  I was imagining maybe a separate interface that could 
> > > > be
> > > > used in the appropriate context, but I don't think that makes too much
> > > > sense - it's just too far away from what Kafka Streams is.  I was
> > > > originally more interested in the "why" Optional than the "how" (I 
> > > > think my
> > > > original reply overplayed the "optional as an argument" concern).  But
> > > > you've convinced me that there is a perfectly legitimate "why".  We 
> > > > should
> > > > make sure that it's clear why it's Optional, but I suppose that goes
> > > > without saying.  It's a nice opportunity to make the API reflect more 
> > > > what
> > > > is actually going on under the hood.
> > > > 
> > > > Thanks!
> > > > Paul
> > > > 
> > > > On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman 
> > > > 
> > > > wrote:
> > > > 
> > > > > FWIW, while I'm really not a fan of Optional in general, I agree that 
> > > > > its
> > > > > usage
> > > > > here seems appropriate. Even for those rare software developers who
> > > > > carefully
> > > > > read all the docs several times over, I think it wouldn't be too hard 
> > > > > to
> > > > > miss a
> > > > > note about the RecordMetadata possibly being null.
> > > > > 
> > > > > Especially because it's not that obvious why at first glance, and 
> > > > > takes a
> > > > > bit of
> > > > > thinking to realize that records originating from a Punctuator 
> > > > > wouldn't
> > > > > have a
> > > > > "current record". This  is something that has definitely confused 
> > > > > users
> > > > > today.
> > > > > 
> > > > > It's on us to improve the education here -- and an 
> > > > > Optional
> > > > > would naturally raise awareness of this subtlety
> > > > > 
> > > > > On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
> > > > > 
> > > > > wrote:
> > > > > 
> > > > > > Does my reply address your concerns?
> > > > > > 
> > > > > > 
> > > > > > Yes; also, I definitely misread part of the proposal earlier and 
> > > > > > thought
> > > > > > you had put
> > > > > > the timestamp field in RecordMetadata. Sorry for not giving things a
> > > > > > closer look
> > > > > > before responding! I'm not sure my original message made much sense 
> > > > > > given
> > > > > > the misunderstanding, but thanks for responding anyway :P
> > > > > > 
> > > > > > Having given the proposal a second pass, I agree, it's very 
> > > > > > elegant. +1
> > > > > > 
> > > > > > On Tue, Sep 29, 2020 at 6:50 PM John Roesler 
> > > > > wrote:
> > > > > > > Thanks for the reply, Sophie,
> > > > > > > 
> > > > > > > I think I may have summarized too much in my prior 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-30 Thread John Roesler
Thanks, Matthias!

I can certainly document it. I didn't bother because the old
Processor, Supplier, and Context will themselves be
deprecated, so any method that handles them won't be able to
avoid the deprecation warning. Nevertheless, it doesn't hurt
just to explicitly deprecated those methods.

Thanks,
-John

On Wed, 2020-09-30 at 08:10 -0700, Matthias J. Sax wrote:
> Thanks John. I like the proposal.
> 
> Btw: I was just going over the KIP and realized that we add new methods
> to `StreamBuilder`, `Topology`, and `KStream` that take the new
> `ProcessorSupplier` class -- should we also deprecate the corresponding
> existing ones that take the old `ProcessorSupplier`?
> 
> 
> -Matthias
> 
> 
> On 9/30/20 7:46 AM, John Roesler wrote:
> > Thanks Paul and Sophie,
> > 
> > Your feedback certainly underscores the need to be explicit
> > in the javadoc about why that parameter is Optional. Getting
> > this kind of feedback before the release is exactly the kind
> > of outcome we hope to get from the KIP process!
> > 
> > Thanks,
> > -John
> > 
> > On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
> > > John, I totally agree that adding a method to Processor is cumbersome and
> > > not a good path.  I was imagining maybe a separate interface that could be
> > > used in the appropriate context, but I don't think that makes too much
> > > sense - it's just too far away from what Kafka Streams is.  I was
> > > originally more interested in the "why" Optional than the "how" (I think 
> > > my
> > > original reply overplayed the "optional as an argument" concern).  But
> > > you've convinced me that there is a perfectly legitimate "why".  We should
> > > make sure that it's clear why it's Optional, but I suppose that goes
> > > without saying.  It's a nice opportunity to make the API reflect more what
> > > is actually going on under the hood.
> > > 
> > > Thanks!
> > > Paul
> > > 
> > > On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman 
> > > wrote:
> > > 
> > > > FWIW, while I'm really not a fan of Optional in general, I agree that 
> > > > its
> > > > usage
> > > > here seems appropriate. Even for those rare software developers who
> > > > carefully
> > > > read all the docs several times over, I think it wouldn't be too hard to
> > > > miss a
> > > > note about the RecordMetadata possibly being null.
> > > > 
> > > > Especially because it's not that obvious why at first glance, and takes 
> > > > a
> > > > bit of
> > > > thinking to realize that records originating from a Punctuator wouldn't
> > > > have a
> > > > "current record". This  is something that has definitely confused users
> > > > today.
> > > > 
> > > > It's on us to improve the education here -- and an 
> > > > Optional
> > > > would naturally raise awareness of this subtlety
> > > > 
> > > > On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
> > > > 
> > > > wrote:
> > > > 
> > > > > Does my reply address your concerns?
> > > > > 
> > > > > 
> > > > > Yes; also, I definitely misread part of the proposal earlier and 
> > > > > thought
> > > > > you had put
> > > > > the timestamp field in RecordMetadata. Sorry for not giving things a
> > > > > closer look
> > > > > before responding! I'm not sure my original message made much sense 
> > > > > given
> > > > > the misunderstanding, but thanks for responding anyway :P
> > > > > 
> > > > > Having given the proposal a second pass, I agree, it's very elegant. 
> > > > > +1
> > > > > 
> > > > > On Tue, Sep 29, 2020 at 6:50 PM John Roesler 
> > > > wrote:
> > > > > > Thanks for the reply, Sophie,
> > > > > > 
> > > > > > I think I may have summarized too much in my prior reply.
> > > > > > 
> > > > > > In the currently proposed KIP, any caller of forward() must
> > > > > > supply a Record, which consists of:
> > > > > > * key
> > > > > > * value
> > > > > > * timestamp
> > > > > > * headers (with a convenience constructor that sets empty
> > > > > > headers)
> > > > > > 
> > > > > > These aren't what I was referring to as potentially being
> > > > > > undefined downstream, since thanks to the introduction of
> > > > > > Record, they are, as you're advocating, required to be
> > > > > > defined everywhere, even when forwarding from a punctuator.
> > > > > > 
> > > > > > So to be clear, the intent of this change is actually to
> > > > > > _enforce_ that timestamp would never be undefined (which it
> > > > > > currently can be). Also, since punctuators _are_ going to
> > > > > > have to "make up" a timestamp going forward, we should note
> > > > > > that the "punctuate" method currently passes in a good
> > > > > > timestamp that they can use: for system-time punctuations,
> > > > > > they receive the current system time, and for stream-time
> > > > > > punctuations, they get the current stream time.
> > > > > > 
> > > > > > The potentially undefined RecordMetadata only contains these
> > > > > > fields:
> > > > > > * topic
> > > > > > * partition
> > > > > > * offset
> > > > > > 
> > > > > > 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-30 Thread Matthias J. Sax
Thanks John. I like the proposal.

Btw: I was just going over the KIP and realized that we add new methods
to `StreamBuilder`, `Topology`, and `KStream` that take the new
`ProcessorSupplier` class -- should we also deprecate the corresponding
existing ones that take the old `ProcessorSupplier`?


-Matthias


On 9/30/20 7:46 AM, John Roesler wrote:
> Thanks Paul and Sophie,
> 
> Your feedback certainly underscores the need to be explicit
> in the javadoc about why that parameter is Optional. Getting
> this kind of feedback before the release is exactly the kind
> of outcome we hope to get from the KIP process!
> 
> Thanks,
> -John
> 
> On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
>> John, I totally agree that adding a method to Processor is cumbersome and
>> not a good path.  I was imagining maybe a separate interface that could be
>> used in the appropriate context, but I don't think that makes too much
>> sense - it's just too far away from what Kafka Streams is.  I was
>> originally more interested in the "why" Optional than the "how" (I think my
>> original reply overplayed the "optional as an argument" concern).  But
>> you've convinced me that there is a perfectly legitimate "why".  We should
>> make sure that it's clear why it's Optional, but I suppose that goes
>> without saying.  It's a nice opportunity to make the API reflect more what
>> is actually going on under the hood.
>>
>> Thanks!
>> Paul
>>
>> On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman 
>> wrote:
>>
>>> FWIW, while I'm really not a fan of Optional in general, I agree that its
>>> usage
>>> here seems appropriate. Even for those rare software developers who
>>> carefully
>>> read all the docs several times over, I think it wouldn't be too hard to
>>> miss a
>>> note about the RecordMetadata possibly being null.
>>>
>>> Especially because it's not that obvious why at first glance, and takes a
>>> bit of
>>> thinking to realize that records originating from a Punctuator wouldn't
>>> have a
>>> "current record". This  is something that has definitely confused users
>>> today.
>>>
>>> It's on us to improve the education here -- and an Optional
>>> would naturally raise awareness of this subtlety
>>>
>>> On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
>>> wrote:
>>>
 Does my reply address your concerns?


 Yes; also, I definitely misread part of the proposal earlier and thought
 you had put
 the timestamp field in RecordMetadata. Sorry for not giving things a
 closer look
 before responding! I'm not sure my original message made much sense given
 the misunderstanding, but thanks for responding anyway :P

 Having given the proposal a second pass, I agree, it's very elegant. +1

 On Tue, Sep 29, 2020 at 6:50 PM John Roesler 
>>> wrote:
> Thanks for the reply, Sophie,
>
> I think I may have summarized too much in my prior reply.
>
> In the currently proposed KIP, any caller of forward() must
> supply a Record, which consists of:
> * key
> * value
> * timestamp
> * headers (with a convenience constructor that sets empty
> headers)
>
> These aren't what I was referring to as potentially being
> undefined downstream, since thanks to the introduction of
> Record, they are, as you're advocating, required to be
> defined everywhere, even when forwarding from a punctuator.
>
> So to be clear, the intent of this change is actually to
> _enforce_ that timestamp would never be undefined (which it
> currently can be). Also, since punctuators _are_ going to
> have to "make up" a timestamp going forward, we should note
> that the "punctuate" method currently passes in a good
> timestamp that they can use: for system-time punctuations,
> they receive the current system time, and for stream-time
> punctuations, they get the current stream time.
>
> The potentially undefined RecordMetadata only contains these
> fields:
> * topic
> * partition
> * offset
>
> These fields aren't required (or even used) in a Sink, and
> it doesn't seem like they would be important to many
> applications. Furthermore, it doesn't _seem_ like you'd even
> want to set these fields. They seem purely informational and
> only useful in the context when you are actually processing
> a real input record. It doesn't sound like you were asking
> for it, but just to put it on the record, I think if we were
> to require values for the metadata from punctuators, people
> would mostly just make up their own dummy values, to no
> one's benefit.
>
> I should also note that with the current
> Record/RecordMetadata split, we will have the freedom to
> move fields into the Record class (or even add new fields)
> if we want them to become "data" as opposed to "metadata" in
> the future.
>
> Thanks for your reply; I was similarly 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-30 Thread John Roesler
Thanks Paul and Sophie,

Your feedback certainly underscores the need to be explicit
in the javadoc about why that parameter is Optional. Getting
this kind of feedback before the release is exactly the kind
of outcome we hope to get from the KIP process!

Thanks,
-John

On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
> John, I totally agree that adding a method to Processor is cumbersome and
> not a good path.  I was imagining maybe a separate interface that could be
> used in the appropriate context, but I don't think that makes too much
> sense - it's just too far away from what Kafka Streams is.  I was
> originally more interested in the "why" Optional than the "how" (I think my
> original reply overplayed the "optional as an argument" concern).  But
> you've convinced me that there is a perfectly legitimate "why".  We should
> make sure that it's clear why it's Optional, but I suppose that goes
> without saying.  It's a nice opportunity to make the API reflect more what
> is actually going on under the hood.
> 
> Thanks!
> Paul
> 
> On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman 
> wrote:
> 
> > FWIW, while I'm really not a fan of Optional in general, I agree that its
> > usage
> > here seems appropriate. Even for those rare software developers who
> > carefully
> > read all the docs several times over, I think it wouldn't be too hard to
> > miss a
> > note about the RecordMetadata possibly being null.
> > 
> > Especially because it's not that obvious why at first glance, and takes a
> > bit of
> > thinking to realize that records originating from a Punctuator wouldn't
> > have a
> > "current record". This  is something that has definitely confused users
> > today.
> > 
> > It's on us to improve the education here -- and an Optional
> > would naturally raise awareness of this subtlety
> > 
> > On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
> > wrote:
> > 
> > > Does my reply address your concerns?
> > > 
> > > 
> > > Yes; also, I definitely misread part of the proposal earlier and thought
> > > you had put
> > > the timestamp field in RecordMetadata. Sorry for not giving things a
> > > closer look
> > > before responding! I'm not sure my original message made much sense given
> > > the misunderstanding, but thanks for responding anyway :P
> > > 
> > > Having given the proposal a second pass, I agree, it's very elegant. +1
> > > 
> > > On Tue, Sep 29, 2020 at 6:50 PM John Roesler 
> > wrote:
> > > > Thanks for the reply, Sophie,
> > > > 
> > > > I think I may have summarized too much in my prior reply.
> > > > 
> > > > In the currently proposed KIP, any caller of forward() must
> > > > supply a Record, which consists of:
> > > > * key
> > > > * value
> > > > * timestamp
> > > > * headers (with a convenience constructor that sets empty
> > > > headers)
> > > > 
> > > > These aren't what I was referring to as potentially being
> > > > undefined downstream, since thanks to the introduction of
> > > > Record, they are, as you're advocating, required to be
> > > > defined everywhere, even when forwarding from a punctuator.
> > > > 
> > > > So to be clear, the intent of this change is actually to
> > > > _enforce_ that timestamp would never be undefined (which it
> > > > currently can be). Also, since punctuators _are_ going to
> > > > have to "make up" a timestamp going forward, we should note
> > > > that the "punctuate" method currently passes in a good
> > > > timestamp that they can use: for system-time punctuations,
> > > > they receive the current system time, and for stream-time
> > > > punctuations, they get the current stream time.
> > > > 
> > > > The potentially undefined RecordMetadata only contains these
> > > > fields:
> > > > * topic
> > > > * partition
> > > > * offset
> > > > 
> > > > These fields aren't required (or even used) in a Sink, and
> > > > it doesn't seem like they would be important to many
> > > > applications. Furthermore, it doesn't _seem_ like you'd even
> > > > want to set these fields. They seem purely informational and
> > > > only useful in the context when you are actually processing
> > > > a real input record. It doesn't sound like you were asking
> > > > for it, but just to put it on the record, I think if we were
> > > > to require values for the metadata from punctuators, people
> > > > would mostly just make up their own dummy values, to no
> > > > one's benefit.
> > > > 
> > > > I should also note that with the current
> > > > Record/RecordMetadata split, we will have the freedom to
> > > > move fields into the Record class (or even add new fields)
> > > > if we want them to become "data" as opposed to "metadata" in
> > > > the future.
> > > > 
> > > > Thanks for your reply; I was similarly floored when I
> > > > realized the true nature of the current situation. Does my
> > > > reply address your concerns?
> > > > 
> > > > Thanks,
> > > > -John
> > > > 
> > > > On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
> > > > wrote:
> > > > > > 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Paul Whalen
John, I totally agree that adding a method to Processor is cumbersome and
not a good path.  I was imagining maybe a separate interface that could be
used in the appropriate context, but I don't think that makes too much
sense - it's just too far away from what Kafka Streams is.  I was
originally more interested in the "why" Optional than the "how" (I think my
original reply overplayed the "optional as an argument" concern).  But
you've convinced me that there is a perfectly legitimate "why".  We should
make sure that it's clear why it's Optional, but I suppose that goes
without saying.  It's a nice opportunity to make the API reflect more what
is actually going on under the hood.

Thanks!
Paul

On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman 
wrote:

> FWIW, while I'm really not a fan of Optional in general, I agree that its
> usage
> here seems appropriate. Even for those rare software developers who
> carefully
> read all the docs several times over, I think it wouldn't be too hard to
> miss a
> note about the RecordMetadata possibly being null.
>
> Especially because it's not that obvious why at first glance, and takes a
> bit of
> thinking to realize that records originating from a Punctuator wouldn't
> have a
> "current record". This  is something that has definitely confused users
> today.
>
> It's on us to improve the education here -- and an Optional
> would naturally raise awareness of this subtlety
>
> On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
> wrote:
>
> > Does my reply address your concerns?
> >
> >
> > Yes; also, I definitely misread part of the proposal earlier and thought
> > you had put
> > the timestamp field in RecordMetadata. Sorry for not giving things a
> > closer look
> > before responding! I'm not sure my original message made much sense given
> > the misunderstanding, but thanks for responding anyway :P
> >
> > Having given the proposal a second pass, I agree, it's very elegant. +1
> >
> > On Tue, Sep 29, 2020 at 6:50 PM John Roesler 
> wrote:
> >
> >> Thanks for the reply, Sophie,
> >>
> >> I think I may have summarized too much in my prior reply.
> >>
> >> In the currently proposed KIP, any caller of forward() must
> >> supply a Record, which consists of:
> >> * key
> >> * value
> >> * timestamp
> >> * headers (with a convenience constructor that sets empty
> >> headers)
> >>
> >> These aren't what I was referring to as potentially being
> >> undefined downstream, since thanks to the introduction of
> >> Record, they are, as you're advocating, required to be
> >> defined everywhere, even when forwarding from a punctuator.
> >>
> >> So to be clear, the intent of this change is actually to
> >> _enforce_ that timestamp would never be undefined (which it
> >> currently can be). Also, since punctuators _are_ going to
> >> have to "make up" a timestamp going forward, we should note
> >> that the "punctuate" method currently passes in a good
> >> timestamp that they can use: for system-time punctuations,
> >> they receive the current system time, and for stream-time
> >> punctuations, they get the current stream time.
> >>
> >> The potentially undefined RecordMetadata only contains these
> >> fields:
> >> * topic
> >> * partition
> >> * offset
> >>
> >> These fields aren't required (or even used) in a Sink, and
> >> it doesn't seem like they would be important to many
> >> applications. Furthermore, it doesn't _seem_ like you'd even
> >> want to set these fields. They seem purely informational and
> >> only useful in the context when you are actually processing
> >> a real input record. It doesn't sound like you were asking
> >> for it, but just to put it on the record, I think if we were
> >> to require values for the metadata from punctuators, people
> >> would mostly just make up their own dummy values, to no
> >> one's benefit.
> >>
> >> I should also note that with the current
> >> Record/RecordMetadata split, we will have the freedom to
> >> move fields into the Record class (or even add new fields)
> >> if we want them to become "data" as opposed to "metadata" in
> >> the future.
> >>
> >> Thanks for your reply; I was similarly floored when I
> >> realized the true nature of the current situation. Does my
> >> reply address your concerns?
> >>
> >> Thanks,
> >> -John
> >>
> >> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
> >> wrote:
> >> > > However, the record metadata is only defined when the parent
> forwards
> >> > > while processing a
> >> >
> >> > real record, not when it calls forward from the punctuator
> >> >
> >> >
> >> > Can we take a step back for a second...why wouldn't you be required to
> >> set
> >> > the RecordContext
> >> > yourself when calling forward from a Punctuator? I think I agree with
> >> Paul
> >> > here, it seems kind of
> >> > absurd not to enforce that the RecordContext be present inside the
> >> > process() method.
> >> >
> >> > The original problem with Punctuators, as I understood it, was that
> all
> >> of
> >> > the 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Sophie Blee-Goldman
FWIW, while I'm really not a fan of Optional in general, I agree that its
usage
here seems appropriate. Even for those rare software developers who
carefully
read all the docs several times over, I think it wouldn't be too hard to
miss a
note about the RecordMetadata possibly being null.

Especially because it's not that obvious why at first glance, and takes a
bit of
thinking to realize that records originating from a Punctuator wouldn't
have a
"current record". This  is something that has definitely confused users
today.

It's on us to improve the education here -- and an Optional
would naturally raise awareness of this subtlety

On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
wrote:

> Does my reply address your concerns?
>
>
> Yes; also, I definitely misread part of the proposal earlier and thought
> you had put
> the timestamp field in RecordMetadata. Sorry for not giving things a
> closer look
> before responding! I'm not sure my original message made much sense given
> the misunderstanding, but thanks for responding anyway :P
>
> Having given the proposal a second pass, I agree, it's very elegant. +1
>
> On Tue, Sep 29, 2020 at 6:50 PM John Roesler  wrote:
>
>> Thanks for the reply, Sophie,
>>
>> I think I may have summarized too much in my prior reply.
>>
>> In the currently proposed KIP, any caller of forward() must
>> supply a Record, which consists of:
>> * key
>> * value
>> * timestamp
>> * headers (with a convenience constructor that sets empty
>> headers)
>>
>> These aren't what I was referring to as potentially being
>> undefined downstream, since thanks to the introduction of
>> Record, they are, as you're advocating, required to be
>> defined everywhere, even when forwarding from a punctuator.
>>
>> So to be clear, the intent of this change is actually to
>> _enforce_ that timestamp would never be undefined (which it
>> currently can be). Also, since punctuators _are_ going to
>> have to "make up" a timestamp going forward, we should note
>> that the "punctuate" method currently passes in a good
>> timestamp that they can use: for system-time punctuations,
>> they receive the current system time, and for stream-time
>> punctuations, they get the current stream time.
>>
>> The potentially undefined RecordMetadata only contains these
>> fields:
>> * topic
>> * partition
>> * offset
>>
>> These fields aren't required (or even used) in a Sink, and
>> it doesn't seem like they would be important to many
>> applications. Furthermore, it doesn't _seem_ like you'd even
>> want to set these fields. They seem purely informational and
>> only useful in the context when you are actually processing
>> a real input record. It doesn't sound like you were asking
>> for it, but just to put it on the record, I think if we were
>> to require values for the metadata from punctuators, people
>> would mostly just make up their own dummy values, to no
>> one's benefit.
>>
>> I should also note that with the current
>> Record/RecordMetadata split, we will have the freedom to
>> move fields into the Record class (or even add new fields)
>> if we want them to become "data" as opposed to "metadata" in
>> the future.
>>
>> Thanks for your reply; I was similarly floored when I
>> realized the true nature of the current situation. Does my
>> reply address your concerns?
>>
>> Thanks,
>> -John
>>
>> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
>> wrote:
>> > > However, the record metadata is only defined when the parent forwards
>> > > while processing a
>> >
>> > real record, not when it calls forward from the punctuator
>> >
>> >
>> > Can we take a step back for a second...why wouldn't you be required to
>> set
>> > the RecordContext
>> > yourself when calling forward from a Punctuator? I think I agree with
>> Paul
>> > here, it seems kind of
>> > absurd not to enforce that the RecordContext be present inside the
>> > process() method.
>> >
>> > The original problem with Punctuators, as I understood it, was that all
>> of
>> > the RecordContext
>> > fields were exposed automatically to both the Processor and any
>> Punctuator,
>> > due to being
>> > direct methods on the ProcessorContext. We can't control which
>> > ProcessorContext methods
>> > someone will call from with a Punctuator vs from a Processor. The best
>> we
>> > could do was
>> > set these "nonsense" fields to null when inside a Punctuator, or set
>> them
>> > to some dummy
>> > values as you pointed out.
>> >
>> > But then you proposed the solution of a separate RecordContext which is
>> not
>> > attached to the
>> > ProcessorContext at all. This seemed to solve the above problem very
>> > neatly: we only pass
>> > in the RecordContext to the process() method, so we don't have to worry
>> > about people trying
>> > to access these fields from within a Punctuator. The fields aren't
>> > accessible unless they're
>> > defined.
>> >
>> > So what happens when someone wants to forward something from within a
>> > Punctuator? I
>> > don't 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Sophie Blee-Goldman
>
> Does my reply address your concerns?


Yes; also, I definitely misread part of the proposal earlier and thought
you had put
the timestamp field in RecordMetadata. Sorry for not giving things a closer
look
before responding! I'm not sure my original message made much sense given
the misunderstanding, but thanks for responding anyway :P

Having given the proposal a second pass, I agree, it's very elegant. +1

On Tue, Sep 29, 2020 at 6:50 PM John Roesler  wrote:

> Thanks for the reply, Sophie,
>
> I think I may have summarized too much in my prior reply.
>
> In the currently proposed KIP, any caller of forward() must
> supply a Record, which consists of:
> * key
> * value
> * timestamp
> * headers (with a convenience constructor that sets empty
> headers)
>
> These aren't what I was referring to as potentially being
> undefined downstream, since thanks to the introduction of
> Record, they are, as you're advocating, required to be
> defined everywhere, even when forwarding from a punctuator.
>
> So to be clear, the intent of this change is actually to
> _enforce_ that timestamp would never be undefined (which it
> currently can be). Also, since punctuators _are_ going to
> have to "make up" a timestamp going forward, we should note
> that the "punctuate" method currently passes in a good
> timestamp that they can use: for system-time punctuations,
> they receive the current system time, and for stream-time
> punctuations, they get the current stream time.
>
> The potentially undefined RecordMetadata only contains these
> fields:
> * topic
> * partition
> * offset
>
> These fields aren't required (or even used) in a Sink, and
> it doesn't seem like they would be important to many
> applications. Furthermore, it doesn't _seem_ like you'd even
> want to set these fields. They seem purely informational and
> only useful in the context when you are actually processing
> a real input record. It doesn't sound like you were asking
> for it, but just to put it on the record, I think if we were
> to require values for the metadata from punctuators, people
> would mostly just make up their own dummy values, to no
> one's benefit.
>
> I should also note that with the current
> Record/RecordMetadata split, we will have the freedom to
> move fields into the Record class (or even add new fields)
> if we want them to become "data" as opposed to "metadata" in
> the future.
>
> Thanks for your reply; I was similarly floored when I
> realized the true nature of the current situation. Does my
> reply address your concerns?
>
> Thanks,
> -John
>
> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
> wrote:
> > > However, the record metadata is only defined when the parent forwards
> > > while processing a
> >
> > real record, not when it calls forward from the punctuator
> >
> >
> > Can we take a step back for a second...why wouldn't you be required to
> set
> > the RecordContext
> > yourself when calling forward from a Punctuator? I think I agree with
> Paul
> > here, it seems kind of
> > absurd not to enforce that the RecordContext be present inside the
> > process() method.
> >
> > The original problem with Punctuators, as I understood it, was that all
> of
> > the RecordContext
> > fields were exposed automatically to both the Processor and any
> Punctuator,
> > due to being
> > direct methods on the ProcessorContext. We can't control which
> > ProcessorContext methods
> > someone will call from with a Punctuator vs from a Processor. The best we
> > could do was
> > set these "nonsense" fields to null when inside a Punctuator, or set them
> > to some dummy
> > values as you pointed out.
> >
> > But then you proposed the solution of a separate RecordContext which is
> not
> > attached to the
> > ProcessorContext at all. This seemed to solve the above problem very
> > neatly: we only pass
> > in the RecordContext to the process() method, so we don't have to worry
> > about people trying
> > to access these fields from within a Punctuator. The fields aren't
> > accessible unless they're
> > defined.
> >
> > So what happens when someone wants to forward something from within a
> > Punctuator? I
> > don't think it's reasonable to let the timestamp field be undefined,
> ever.
> > What if the Punctuator
> > forwards directly to a sink, or directly to some windowing logic. Are we
> > supposed to add
> > handling for the RecordContext == null case to every processor? Or are we
> > just going to
> > assume the implicit restriction that users will only forward records
> from a
> > Punctuator to
> > downstream processors that know how to handle and/or set the
> RecordContext
> > if it's
> > undefined. That seems to throw away a lot of the awesome safety added in
> > this KIP
> >
> > Apologies for the rant. But I feel pretty strongly that allowing to
> forward
> > records from a
> > Punctuator without a defined RecordContext would be asking for trouble.
> > Imo, if you
> > want to forward from a Punctuator, you need to store the 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread John Roesler
Thanks for the reply, Sophie,

I think I may have summarized too much in my prior reply.

In the currently proposed KIP, any caller of forward() must
supply a Record, which consists of:
* key
* value
* timestamp
* headers (with a convenience constructor that sets empty
headers)

These aren't what I was referring to as potentially being
undefined downstream, since thanks to the introduction of
Record, they are, as you're advocating, required to be
defined everywhere, even when forwarding from a punctuator.

So to be clear, the intent of this change is actually to
_enforce_ that timestamp would never be undefined (which it
currently can be). Also, since punctuators _are_ going to
have to "make up" a timestamp going forward, we should note
that the "punctuate" method currently passes in a good
timestamp that they can use: for system-time punctuations,
they receive the current system time, and for stream-time
punctuations, they get the current stream time.

The potentially undefined RecordMetadata only contains these
fields:
* topic
* partition
* offset

These fields aren't required (or even used) in a Sink, and
it doesn't seem like they would be important to many
applications. Furthermore, it doesn't _seem_ like you'd even
want to set these fields. They seem purely informational and
only useful in the context when you are actually processing
a real input record. It doesn't sound like you were asking
for it, but just to put it on the record, I think if we were
to require values for the metadata from punctuators, people
would mostly just make up their own dummy values, to no
one's benefit.

I should also note that with the current
Record/RecordMetadata split, we will have the freedom to
move fields into the Record class (or even add new fields)
if we want them to become "data" as opposed to "metadata" in
the future.

Thanks for your reply; I was similarly floored when I
realized the true nature of the current situation. Does my
reply address your concerns?

Thanks,
-John

On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
wrote:
> > However, the record metadata is only defined when the parent forwards
> > while processing a
> 
> real record, not when it calls forward from the punctuator
> 
> 
> Can we take a step back for a second...why wouldn't you be required to set
> the RecordContext
> yourself when calling forward from a Punctuator? I think I agree with Paul
> here, it seems kind of
> absurd not to enforce that the RecordContext be present inside the
> process() method.
> 
> The original problem with Punctuators, as I understood it, was that all of
> the RecordContext
> fields were exposed automatically to both the Processor and any Punctuator,
> due to being
> direct methods on the ProcessorContext. We can't control which
> ProcessorContext methods
> someone will call from with a Punctuator vs from a Processor. The best we
> could do was
> set these "nonsense" fields to null when inside a Punctuator, or set them
> to some dummy
> values as you pointed out.
> 
> But then you proposed the solution of a separate RecordContext which is not
> attached to the
> ProcessorContext at all. This seemed to solve the above problem very
> neatly: we only pass
> in the RecordContext to the process() method, so we don't have to worry
> about people trying
> to access these fields from within a Punctuator. The fields aren't
> accessible unless they're
> defined.
> 
> So what happens when someone wants to forward something from within a
> Punctuator? I
> don't think it's reasonable to let the timestamp field be undefined, ever.
> What if the Punctuator
> forwards directly to a sink, or directly to some windowing logic. Are we
> supposed to add
> handling for the RecordContext == null case to every processor? Or are we
> just going to
> assume the implicit restriction that users will only forward records from a
> Punctuator to
> downstream processors that know how to handle and/or set the RecordContext
> if it's
> undefined. That seems to throw away a lot of the awesome safety added in
> this KIP
> 
> Apologies for the rant. But I feel pretty strongly that allowing to forward
> records from a
> Punctuator without a defined RecordContext would be asking for trouble.
> Imo, if you
> want to forward from a Punctuator, you need to store the info you need in
> order to
> set the timestamp, or make one up yourself
> 
> (the one alternative I can think of here is that maybe we could pass in the
> current
> partition time, so users can at least put in a reasonable estimate for the
> timestamp
> that won't cause it to get dropped and won't potentially lurch the
> streamtime far into
> the future. This would be similar to what we do in the TimestampExtractor)
> 
> On Tue, Sep 29, 2020 at 6:06 PM John Roesler  wrote:
> 
> > Oh, I guess one other thing I should have mentioned is that I’ve recently
> > discovered that in cases where the context is undefined, we currently just
> > fill in dummy values for the context. So there’s a 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Sophie Blee-Goldman
>
> However, the record metadata is only defined when the parent forwards
> while processing a

real record, not when it calls forward from the punctuator


Can we take a step back for a second...why wouldn't you be required to set
the RecordContext
yourself when calling forward from a Punctuator? I think I agree with Paul
here, it seems kind of
absurd not to enforce that the RecordContext be present inside the
process() method.

The original problem with Punctuators, as I understood it, was that all of
the RecordContext
fields were exposed automatically to both the Processor and any Punctuator,
due to being
direct methods on the ProcessorContext. We can't control which
ProcessorContext methods
someone will call from with a Punctuator vs from a Processor. The best we
could do was
set these "nonsense" fields to null when inside a Punctuator, or set them
to some dummy
values as you pointed out.

But then you proposed the solution of a separate RecordContext which is not
attached to the
ProcessorContext at all. This seemed to solve the above problem very
neatly: we only pass
in the RecordContext to the process() method, so we don't have to worry
about people trying
to access these fields from within a Punctuator. The fields aren't
accessible unless they're
defined.

So what happens when someone wants to forward something from within a
Punctuator? I
don't think it's reasonable to let the timestamp field be undefined, ever.
What if the Punctuator
forwards directly to a sink, or directly to some windowing logic. Are we
supposed to add
handling for the RecordContext == null case to every processor? Or are we
just going to
assume the implicit restriction that users will only forward records from a
Punctuator to
downstream processors that know how to handle and/or set the RecordContext
if it's
undefined. That seems to throw away a lot of the awesome safety added in
this KIP

Apologies for the rant. But I feel pretty strongly that allowing to forward
records from a
Punctuator without a defined RecordContext would be asking for trouble.
Imo, if you
want to forward from a Punctuator, you need to store the info you need in
order to
set the timestamp, or make one up yourself

(the one alternative I can think of here is that maybe we could pass in the
current
partition time, so users can at least put in a reasonable estimate for the
timestamp
that won't cause it to get dropped and won't potentially lurch the
streamtime far into
the future. This would be similar to what we do in the TimestampExtractor)

On Tue, Sep 29, 2020 at 6:06 PM John Roesler  wrote:

> Oh, I guess one other thing I should have mentioned is that I’ve recently
> discovered that in cases where the context is undefined, we currently just
> fill in dummy values for the context. So there’s a good chance that real
> applications in use are depending on undefined context without even
> realizing it. What I’m hoping to do is just make the situation explicit and
> get rid of the dummy values.
>
> Thanks,
> John
>
> On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
> > Thanks for the review, Paul!
> >
> > I had read some of that debate before. There seems to be some subtext
> > there, because they advise against using Optional in cases like this,
> > but there doesn’t seem to be a specific reason why it’s inappropriate.
> > I got the impression they were just afraid that people would go
> > overboard and make everything Optional.
> >
> > I could also make two methods, but it seemed like it might be an
> > unfortunate way to handle the issue, since Processor is just about a
> > Function as-is, but the two-method approach would require people to
> > implement both methods.
> >
> > To your question, this is something that’s only recently became clear
> > to me. Imagine you have a parent processor that calls forward both from
> > process and a punctuator. The child will have process() invoked in both
> > cases, and won’t be able to distinguish them. However, the record
> > metadata is only defined when the parent forwards while processing a
> > real record, not when it calls forward from the punctuator.
> >
> > This is why I wanted to make the metadata Optional, to advertise that
> > the metadata might be undefined if any ancestor processor ever calls
> > forward from a punctuator. We could remove the Optional and instead
> > just document that the argument might be null.
> >
> > With that context in place, what’s your take?
> >
> > Thanks,
> > John
> >
> > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> > > Looks pretty good to me, though the Processor#process(Record,
> > > Optional) signature caught my eye.  There's some
> debate
> > > (
> > >
> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
> )
> > > about whether to use Optionals in arguments, and while that's a bit of
> a
> > > religious debate in the abstract, it did make me wonder whether it
> makes
> > > sense in this specific case.  When is it 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread John Roesler
Oh, I guess one other thing I should have mentioned is that I’ve recently 
discovered that in cases where the context is undefined, we currently just fill 
in dummy values for the context. So there’s a good chance that real 
applications in use are depending on undefined context without even realizing 
it. What I’m hoping to do is just make the situation explicit and get rid of 
the dummy values. 

Thanks,
John

On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
> Thanks for the review, Paul!
> 
> I had read some of that debate before. There seems to be some subtext 
> there, because they advise against using Optional in cases like this, 
> but there doesn’t seem to be a specific reason why it’s inappropriate. 
> I got the impression they were just afraid that people would go 
> overboard and make everything Optional. 
> 
> I could also make two methods, but it seemed like it might be an 
> unfortunate way to handle the issue, since Processor is just about a 
> Function as-is, but the two-method approach would require people to 
> implement both methods.
> 
> To your question, this is something that’s only recently became clear 
> to me. Imagine you have a parent processor that calls forward both from 
> process and a punctuator. The child will have process() invoked in both 
> cases, and won’t be able to distinguish them. However, the record 
> metadata is only defined when the parent forwards while processing a 
> real record, not when it calls forward from the punctuator.
> 
> This is why I wanted to make the metadata Optional, to advertise that 
> the metadata might be undefined if any ancestor processor ever calls 
> forward from a punctuator. We could remove the Optional and instead 
> just document that the argument might be null.
> 
> With that context in place, what’s your take?
> 
> Thanks,
> John
> 
> On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> > Looks pretty good to me, though the Processor#process(Record,
> > Optional) signature caught my eye.  There's some debate 
> > (
> > https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments)
> > about whether to use Optionals in arguments, and while that's a bit of a
> > religious debate in the abstract, it did make me wonder whether it makes
> > sense in this specific case.  When is it actually not present?  I was 
> > under
> > the impression that we should always have access to it in process(), and
> > that the concern about metadata being undefined was about having access 
> > to
> > record metadata in the ProcessorContext held for use inside a 
> > Punctuator.
> > 
> > If that's not the case and it is truly optional in process(), is there an
> > opportunity for an alternate interface for the cases when we don't get it,
> > rather than force the branching on implementers of the interface?
> > 
> > Apologies if I've missed something, I took a look at the PR and I didn't
> > see any spots where I thought it would be empty.  Perhaps an example of a
> > Punctuator using (and not using) the new API would clear things up.
> > 
> > Best,
> > Paul
> > 
> > On Tue, Sep 29, 2020 at 4:10 PM John Roesler  wrote:
> > 
> > > Hello again, all,
> > >
> > > Thanks for the latest round of discussion. I've taken the
> > > recent feedback and come up with an updated KIP that seems
> > > actually quite a bit nicer than the prior proposal.
> > >
> > > The specific diff on the KIP is here:
> > >
> > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=15=14
> > >
> > > These changes are implemented in this POC PR:
> > > https://github.com/apache/kafka/pull/9346
> > >
> > > The basic idea is that, building on the recent conversaion,
> > > we would transition away from the current API where we get
> > > only key/value in the process() method and other "data"
> > > comes in the ProcessorContext along with the "metadata".
> > >
> > > Instead, we formalize what is "data" and what is "metadata",
> > > and pass it all in to the process method:
> > > Processor#process(Record, Optional)
> > >
> > > Also, you forward the whole data class instead of mutating
> > > the ProcessorContext fields and also calling forward:
> > > ProcessorContext#forward(Record)
> > >
> > > The Record class itself ships with methods like
> > > record#withValue(NewV newValue)
> > > that make a shallow copy of the input Record, enabling
> > > Processors to safely handle the record without polluting the
> > > context of their parents and siblings.
> > >
> > > This proposal has a number of key benefits:
> > > * As we've discovered in KAFKA-9584, it's unsafe to mutate
> > > the Headers via the ProcessorContext. This proposal offers a
> > > way to safely forward changes only to downstream processors.
> > > * The new API has symmetry (each processor's input is the
> > > output of its parent processor)
> > > * The API makes clear that the record metadata isn't always
> > > defined (for example, in a punctuation, there is no 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread John Roesler
Thanks for the review, Paul!

I had read some of that debate before. There seems to be some subtext there, 
because they advise against using Optional in cases like this, but there 
doesn’t seem to be a specific reason why it’s inappropriate. I got the 
impression they were just afraid that people would go overboard and make 
everything Optional. 

I could also make two methods, but it seemed like it might be an unfortunate 
way to handle the issue, since Processor is just about a Function as-is, but 
the two-method approach would require people to implement both methods.

To your question, this is something that’s only recently became clear to me. 
Imagine you have a parent processor that calls forward both from process and a 
punctuator. The child will have process() invoked in both cases, and won’t be 
able to distinguish them. However, the record metadata is only defined when the 
parent forwards while processing a real record, not when it calls forward from 
the punctuator.

This is why I wanted to make the metadata Optional, to advertise that the 
metadata might be undefined if any ancestor processor ever calls forward from a 
punctuator. We could remove the Optional and instead just document that the 
argument might be null.

With that context in place, what’s your take?

Thanks,
John

On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> Looks pretty good to me, though the Processor#process(Record,
> Optional) signature caught my eye.  There's some debate 
> (
> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments)
> about whether to use Optionals in arguments, and while that's a bit of a
> religious debate in the abstract, it did make me wonder whether it makes
> sense in this specific case.  When is it actually not present?  I was 
> under
> the impression that we should always have access to it in process(), and
> that the concern about metadata being undefined was about having access 
> to
> record metadata in the ProcessorContext held for use inside a 
> Punctuator.
> 
> If that's not the case and it is truly optional in process(), is there an
> opportunity for an alternate interface for the cases when we don't get it,
> rather than force the branching on implementers of the interface?
> 
> Apologies if I've missed something, I took a look at the PR and I didn't
> see any spots where I thought it would be empty.  Perhaps an example of a
> Punctuator using (and not using) the new API would clear things up.
> 
> Best,
> Paul
> 
> On Tue, Sep 29, 2020 at 4:10 PM John Roesler  wrote:
> 
> > Hello again, all,
> >
> > Thanks for the latest round of discussion. I've taken the
> > recent feedback and come up with an updated KIP that seems
> > actually quite a bit nicer than the prior proposal.
> >
> > The specific diff on the KIP is here:
> >
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=15=14
> >
> > These changes are implemented in this POC PR:
> > https://github.com/apache/kafka/pull/9346
> >
> > The basic idea is that, building on the recent conversaion,
> > we would transition away from the current API where we get
> > only key/value in the process() method and other "data"
> > comes in the ProcessorContext along with the "metadata".
> >
> > Instead, we formalize what is "data" and what is "metadata",
> > and pass it all in to the process method:
> > Processor#process(Record, Optional)
> >
> > Also, you forward the whole data class instead of mutating
> > the ProcessorContext fields and also calling forward:
> > ProcessorContext#forward(Record)
> >
> > The Record class itself ships with methods like
> > record#withValue(NewV newValue)
> > that make a shallow copy of the input Record, enabling
> > Processors to safely handle the record without polluting the
> > context of their parents and siblings.
> >
> > This proposal has a number of key benefits:
> > * As we've discovered in KAFKA-9584, it's unsafe to mutate
> > the Headers via the ProcessorContext. This proposal offers a
> > way to safely forward changes only to downstream processors.
> > * The new API has symmetry (each processor's input is the
> > output of its parent processor)
> > * The API makes clear that the record metadata isn't always
> > defined (for example, in a punctuation, there is no current
> > topic/partition/offset)
> > * The API enables punctuators to forward well defined
> > headers downstream, which is currently not possible.
> >
> > Unless their are objections, I'll go ahead and re-finalize
> > this KIP and update that PR to a mergeable state.
> >
> > Thanks, all,
> > -John
> >
> >
> > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> > > Interesting proposal. However, I am not totally convinced, because I see
> > > a fundamental difference between "data" and "metadata".
> > >
> > > Topic/partition/offset are "metadata" in the strong sense and they are
> > > immutable.
> > >
> > > On the other hand there is "primary" data 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Paul Whalen
Looks pretty good to me, though the Processor#process(Record,
Optional) signature caught my eye.  There's some debate (
https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments)
about whether to use Optionals in arguments, and while that's a bit of a
religious debate in the abstract, it did make me wonder whether it makes
sense in this specific case.  When is it actually not present?  I was under
the impression that we should always have access to it in process(), and
that the concern about metadata being undefined was about having access to
record metadata in the ProcessorContext held for use inside a Punctuator.

If that's not the case and it is truly optional in process(), is there an
opportunity for an alternate interface for the cases when we don't get it,
rather than force the branching on implementers of the interface?

Apologies if I've missed something, I took a look at the PR and I didn't
see any spots where I thought it would be empty.  Perhaps an example of a
Punctuator using (and not using) the new API would clear things up.

Best,
Paul

On Tue, Sep 29, 2020 at 4:10 PM John Roesler  wrote:

> Hello again, all,
>
> Thanks for the latest round of discussion. I've taken the
> recent feedback and come up with an updated KIP that seems
> actually quite a bit nicer than the prior proposal.
>
> The specific diff on the KIP is here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=15=14
>
> These changes are implemented in this POC PR:
> https://github.com/apache/kafka/pull/9346
>
> The basic idea is that, building on the recent conversaion,
> we would transition away from the current API where we get
> only key/value in the process() method and other "data"
> comes in the ProcessorContext along with the "metadata".
>
> Instead, we formalize what is "data" and what is "metadata",
> and pass it all in to the process method:
> Processor#process(Record, Optional)
>
> Also, you forward the whole data class instead of mutating
> the ProcessorContext fields and also calling forward:
> ProcessorContext#forward(Record)
>
> The Record class itself ships with methods like
> record#withValue(NewV newValue)
> that make a shallow copy of the input Record, enabling
> Processors to safely handle the record without polluting the
> context of their parents and siblings.
>
> This proposal has a number of key benefits:
> * As we've discovered in KAFKA-9584, it's unsafe to mutate
> the Headers via the ProcessorContext. This proposal offers a
> way to safely forward changes only to downstream processors.
> * The new API has symmetry (each processor's input is the
> output of its parent processor)
> * The API makes clear that the record metadata isn't always
> defined (for example, in a punctuation, there is no current
> topic/partition/offset)
> * The API enables punctuators to forward well defined
> headers downstream, which is currently not possible.
>
> Unless their are objections, I'll go ahead and re-finalize
> this KIP and update that PR to a mergeable state.
>
> Thanks, all,
> -John
>
>
> On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> > Interesting proposal. However, I am not totally convinced, because I see
> > a fundamental difference between "data" and "metadata".
> >
> > Topic/partition/offset are "metadata" in the strong sense and they are
> > immutable.
> >
> > On the other hand there is "primary" data like key and value, as well as
> > "secondary" data like timestamp and headers. The issue seems that we
> > treat "secondary data" more like metadata atm?
> >
> > Thus, promoting timestamp and headers into a first class citizen roll
> > make sense to me (my original proposal about `RecordContext` would still
> > fall short with this regard). However, putting both (data and metadata)
> > into a `Record` abstraction might go too far?
> >
> > I am also a little bit concerned about `Record.copy()` because it might
> > be a trap: Users might assume it does a full deep copy of the record,
> > however, it would not. It would only create a new `Record` object as
> > wrapper that points to the same key/value/header objects as the input
> > record.
> >
> > With the current `context.forward(key, value)` we don't have this "deep
> > copy" issue -- it's pretty clear what is happening.
> >
> > Instead of `To.all().withTimestamp()` we could also add
> > `context.forward(key, value, timestamp)` etc (just wondering about the
> > exposition in overload)?
> >
> > Also, `Record.withValue` etc sounds odd? Should a record not be
> > immutable? So, we could have something like
> >
> >
> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
> > But it looks rather verbose?
> >
> > The other question is of course, to what extend to we want to keep the
> > distinction between "primary" and "secondary" data? To me, it's a
> > question of easy of use?
> >
> > Just putting all this out to move the discussion forward. 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread John Roesler
Hello again, all,

Thanks for the latest round of discussion. I've taken the
recent feedback and come up with an updated KIP that seems
actually quite a bit nicer than the prior proposal.

The specific diff on the KIP is here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=15=14

These changes are implemented in this POC PR:
https://github.com/apache/kafka/pull/9346

The basic idea is that, building on the recent conversaion,
we would transition away from the current API where we get
only key/value in the process() method and other "data"
comes in the ProcessorContext along with the "metadata".

Instead, we formalize what is "data" and what is "metadata",
and pass it all in to the process method:
Processor#process(Record, Optional)

Also, you forward the whole data class instead of mutating
the ProcessorContext fields and also calling forward:
ProcessorContext#forward(Record)

The Record class itself ships with methods like
record#withValue(NewV newValue)
that make a shallow copy of the input Record, enabling
Processors to safely handle the record without polluting the
context of their parents and siblings.

This proposal has a number of key benefits:
* As we've discovered in KAFKA-9584, it's unsafe to mutate
the Headers via the ProcessorContext. This proposal offers a
way to safely forward changes only to downstream processors.
* The new API has symmetry (each processor's input is the
output of its parent processor)
* The API makes clear that the record metadata isn't always
defined (for example, in a punctuation, there is no current
topic/partition/offset)
* The API enables punctuators to forward well defined
headers downstream, which is currently not possible.

Unless their are objections, I'll go ahead and re-finalize
this KIP and update that PR to a mergeable state.

Thanks, all,
-John


On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> Interesting proposal. However, I am not totally convinced, because I see
> a fundamental difference between "data" and "metadata".
> 
> Topic/partition/offset are "metadata" in the strong sense and they are
> immutable.
> 
> On the other hand there is "primary" data like key and value, as well as
> "secondary" data like timestamp and headers. The issue seems that we
> treat "secondary data" more like metadata atm?
> 
> Thus, promoting timestamp and headers into a first class citizen roll
> make sense to me (my original proposal about `RecordContext` would still
> fall short with this regard). However, putting both (data and metadata)
> into a `Record` abstraction might go too far?
> 
> I am also a little bit concerned about `Record.copy()` because it might
> be a trap: Users might assume it does a full deep copy of the record,
> however, it would not. It would only create a new `Record` object as
> wrapper that points to the same key/value/header objects as the input
> record.
> 
> With the current `context.forward(key, value)` we don't have this "deep
> copy" issue -- it's pretty clear what is happening.
> 
> Instead of `To.all().withTimestamp()` we could also add
> `context.forward(key, value, timestamp)` etc (just wondering about the
> exposition in overload)?
> 
> Also, `Record.withValue` etc sounds odd? Should a record not be
> immutable? So, we could have something like
> 
> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
> But it looks rather verbose?
> 
> The other question is of course, to what extend to we want to keep the
> distinction between "primary" and "secondary" data? To me, it's a
> question of easy of use?
> 
> Just putting all this out to move the discussion forward. Don't have a
> concrete proposal atm.
> 
> 
> -Matthias
> 
> 
> On 9/14/20 9:24 AM, John Roesler wrote:
> > Thanks for this thought, Matthias!
> > 
> > To be honest, it's bugged me quite a bit that _all_ the
> > record information hasn't been an argument to `process`. I
> > suppose I was trying to be conservative in this proposal,
> > but then again, if we're adding new Processor and
> > ProcessorContext interfaces, then this is the time to make
> > such a change.
> > 
> > To be unambiguous, I think this is what we're talking about:
> > ProcessorContext:
> > * applicationId
> > * taskId
> > * appConfigs
> > * appConfigsWithPrefix
> > * keySerde
> > * valueSerde
> > * stateDir
> > * metrics
> > * schedule
> > * commit
> > * forward
> > 
> > StateStoreContext:
> > * applicationId
> > * taskId
> > * appConfigs
> > * appConfigsWithPrefix
> > * keySerde
> > * valueSerde
> > * stateDir
> > * metrics
> > * register
> > 
> > 
> > RecordContext
> > * topic
> > * partition
> > * offset
> > * timestamp
> > * headers
> > 
> > 
> > Your proposal sounds good to me as-is. Just to cover the
> > bases, though, I'm wondering if we should push the idea just
> > a little farther. Instead of decomposing key,value,context,
> > we could just keep them all in one object, like this:
> > 
> > Record:
> > * key
> > * value
> > * 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-24 Thread Matthias J. Sax
Interesting proposal. However, I am not totally convinced, because I see
a fundamental difference between "data" and "metadata".

Topic/partition/offset are "metadata" in the strong sense and they are
immutable.

On the other hand there is "primary" data like key and value, as well as
"secondary" data like timestamp and headers. The issue seems that we
treat "secondary data" more like metadata atm?

Thus, promoting timestamp and headers into a first class citizen roll
make sense to me (my original proposal about `RecordContext` would still
fall short with this regard). However, putting both (data and metadata)
into a `Record` abstraction might go too far?

I am also a little bit concerned about `Record.copy()` because it might
be a trap: Users might assume it does a full deep copy of the record,
however, it would not. It would only create a new `Record` object as
wrapper that points to the same key/value/header objects as the input
record.

With the current `context.forward(key, value)` we don't have this "deep
copy" issue -- it's pretty clear what is happening.

Instead of `To.all().withTimestamp()` we could also add
`context.forward(key, value, timestamp)` etc (just wondering about the
exposition in overload)?

Also, `Record.withValue` etc sounds odd? Should a record not be
immutable? So, we could have something like

`RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
But it looks rather verbose?

The other question is of course, to what extend to we want to keep the
distinction between "primary" and "secondary" data? To me, it's a
question of easy of use?

Just putting all this out to move the discussion forward. Don't have a
concrete proposal atm.


-Matthias


On 9/14/20 9:24 AM, John Roesler wrote:
> Thanks for this thought, Matthias!
> 
> To be honest, it's bugged me quite a bit that _all_ the
> record information hasn't been an argument to `process`. I
> suppose I was trying to be conservative in this proposal,
> but then again, if we're adding new Processor and
> ProcessorContext interfaces, then this is the time to make
> such a change.
> 
> To be unambiguous, I think this is what we're talking about:
> ProcessorContext:
> * applicationId
> * taskId
> * appConfigs
> * appConfigsWithPrefix
> * keySerde
> * valueSerde
> * stateDir
> * metrics
> * schedule
> * commit
> * forward
> 
> StateStoreContext:
> * applicationId
> * taskId
> * appConfigs
> * appConfigsWithPrefix
> * keySerde
> * valueSerde
> * stateDir
> * metrics
> * register
> 
> 
> RecordContext
> * topic
> * partition
> * offset
> * timestamp
> * headers
> 
> 
> Your proposal sounds good to me as-is. Just to cover the
> bases, though, I'm wondering if we should push the idea just
> a little farther. Instead of decomposing key,value,context,
> we could just keep them all in one object, like this:
> 
> Record:
> * key
> * value
> * topic
> * partition
> * offset
> * timestamp
> * headers
> 
> Then, we could have:
> Processor#process(Record)
> ProcessorContext#forward(Record, To)
> 
> Viewed from this perspective, a record has three properties
> that people may specify in their processors: key, value, and
> timestamp.
> 
> We could deprecate `To#withTimestamp` and enable people to
> specify the timestamp along with the key and value when they
> forward a record.
> 
> E.g.,
> RecordBuilder toForward = RecordBuilder.copy(record)
> toForward.withKey(newKey)
> toForward.withValue(newValue)
> toForward.withTimestamp(newTimestamp)
> Record newRecord = toForward.build()
> context.forward(newRecord, To.child("child1"))
> 
> Or, the more compact common case:
> current:
>  context.forward(key, "newValue")
> proposed:
>  context.forward(copy(record).withValue("newValue").build())
> 
> 
> It's slightly more verbose, but also more extensible. This
> would give us a clean path to add header support in PAPI as
> well, simply by adding `withHeaders` in RecordBuilder.
> 
> It's also more symmetrical, since the recipient of `forward`
> would just get the sent `Record`. Whereas today, the sender
> puts the timestamp in `To`, but the recipient gets in in its
> own `ProcessorContext`.
> 
> WDYT?
> -John
> 
> On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote:
>> I think separating the different contexts make sense.
>>
>> In fact, we could even go one step further and remove the record context
>> from the processor context completely and we add a third parameter to
>> `process(key, value, recordContext)`. This would make it clear that the
>> context is for the input record only and it's not possible to pass it to
>> a `punctuate` callback.
>>
>> For the stores and changelogging: I think there are two cases. (1) You
>> use a plain key-value store. For this case, it seems you do not care
>> about the timestamp and thus does not care what timestamp is set in the
>> changelog records. (We can set anything we want, as it's not relevant at
>> all -- the timestamp is ignored on read anyway.) (2) The other case is,
>> that one does 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-14 Thread John Roesler
Thanks for this thought, Matthias!

To be honest, it's bugged me quite a bit that _all_ the
record information hasn't been an argument to `process`. I
suppose I was trying to be conservative in this proposal,
but then again, if we're adding new Processor and
ProcessorContext interfaces, then this is the time to make
such a change.

To be unambiguous, I think this is what we're talking about:
ProcessorContext:
* applicationId
* taskId
* appConfigs
* appConfigsWithPrefix
* keySerde
* valueSerde
* stateDir
* metrics
* schedule
* commit
* forward

StateStoreContext:
* applicationId
* taskId
* appConfigs
* appConfigsWithPrefix
* keySerde
* valueSerde
* stateDir
* metrics
* register


RecordContext
* topic
* partition
* offset
* timestamp
* headers


Your proposal sounds good to me as-is. Just to cover the
bases, though, I'm wondering if we should push the idea just
a little farther. Instead of decomposing key,value,context,
we could just keep them all in one object, like this:

Record:
* key
* value
* topic
* partition
* offset
* timestamp
* headers

Then, we could have:
Processor#process(Record)
ProcessorContext#forward(Record, To)

Viewed from this perspective, a record has three properties
that people may specify in their processors: key, value, and
timestamp.

We could deprecate `To#withTimestamp` and enable people to
specify the timestamp along with the key and value when they
forward a record.

E.g.,
RecordBuilder toForward = RecordBuilder.copy(record)
toForward.withKey(newKey)
toForward.withValue(newValue)
toForward.withTimestamp(newTimestamp)
Record newRecord = toForward.build()
context.forward(newRecord, To.child("child1"))

Or, the more compact common case:
current:
 context.forward(key, "newValue")
proposed:
 context.forward(copy(record).withValue("newValue").build())


It's slightly more verbose, but also more extensible. This
would give us a clean path to add header support in PAPI as
well, simply by adding `withHeaders` in RecordBuilder.

It's also more symmetrical, since the recipient of `forward`
would just get the sent `Record`. Whereas today, the sender
puts the timestamp in `To`, but the recipient gets in in its
own `ProcessorContext`.

WDYT?
-John

On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote:
> I think separating the different contexts make sense.
> 
> In fact, we could even go one step further and remove the record context
> from the processor context completely and we add a third parameter to
> `process(key, value, recordContext)`. This would make it clear that the
> context is for the input record only and it's not possible to pass it to
> a `punctuate` callback.
> 
> For the stores and changelogging: I think there are two cases. (1) You
> use a plain key-value store. For this case, it seems you do not care
> about the timestamp and thus does not care what timestamp is set in the
> changelog records. (We can set anything we want, as it's not relevant at
> all -- the timestamp is ignored on read anyway.) (2) The other case is,
> that one does care about timestamps, and for this case should use
> TimestampedKeyValueStore. The passed timestamp will be set on the
> changelog records for this case.
> 
> Thus, for both cases, accessing the record context does not seems to be
> a requirement. And providing access to the processor context to, eg.,
> `forward()` or similar seems safe.
> 
> 
> -Matthias
> 
> On 9/10/20 7:25 PM, John Roesler wrote:
> > Thanks for the reply, Paul!
> > 
> > I certainly intend to make sure that the changelogging layer
> > continues to work the way it does now, by hook or by crook.
> > I think the easiest path for me is to just "cheat" and get
> > the real ProcessorContext into the ChangeLoggingStore
> > implementation somehow. I'll tag you on the PR when I create
> > it, so you have an opportunity to express a preference about
> > the implementation choice, and maybe even compile/test
> > against it to make sure your stuff still works.
> > 
> > Regarding this:
> > 
> > > we have an interest in making a state store with a richer
> > > way of querying its data (like perhaps getting all values
> > > associated with a secondary key), while still ultimately
> > > writing to the changelog topic for later restoration.
> > 
> > This is very intriguing to me. On the side, I've been
> > preparing a couple of ideas related to this topic. I don't
> > think I have a coherent enough thought to even express it in
> > a Jira right now, but when I do, I'll tag you on it also to
> > see what you think.
> > 
> > Whenever you're ready to share the usability improvement
> > ideas, I'm very interested to see what you've come up with.
> > 
> > Thanks,
> > -John
> > 
> > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
> > > > when you use a HashMap or RocksDB or other "state stores", you don't
> > > > expect them to automatically know extra stuff about the record you're
> > > > storing.
> > > 
> > > So, I don't think there is any reason we *can't* retain the record context

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-11 Thread Matthias J. Sax
I think separating the different contexts make sense.

In fact, we could even go one step further and remove the record context
from the processor context completely and we add a third parameter to
`process(key, value, recordContext)`. This would make it clear that the
context is for the input record only and it's not possible to pass it to
a `punctuate` callback.

For the stores and changelogging: I think there are two cases. (1) You
use a plain key-value store. For this case, it seems you do not care
about the timestamp and thus does not care what timestamp is set in the
changelog records. (We can set anything we want, as it's not relevant at
all -- the timestamp is ignored on read anyway.) (2) The other case is,
that one does care about timestamps, and for this case should use
TimestampedKeyValueStore. The passed timestamp will be set on the
changelog records for this case.

Thus, for both cases, accessing the record context does not seems to be
a requirement. And providing access to the processor context to, eg.,
`forward()` or similar seems safe.


-Matthias

On 9/10/20 7:25 PM, John Roesler wrote:
> Thanks for the reply, Paul!
> 
> I certainly intend to make sure that the changelogging layer
> continues to work the way it does now, by hook or by crook.
> I think the easiest path for me is to just "cheat" and get
> the real ProcessorContext into the ChangeLoggingStore
> implementation somehow. I'll tag you on the PR when I create
> it, so you have an opportunity to express a preference about
> the implementation choice, and maybe even compile/test
> against it to make sure your stuff still works.
> 
> Regarding this:
> 
>> we have an interest in making a state store with a richer
>> way of querying its data (like perhaps getting all values
>> associated with a secondary key), while still ultimately
>> writing to the changelog topic for later restoration.
> 
> This is very intriguing to me. On the side, I've been
> preparing a couple of ideas related to this topic. I don't
> think I have a coherent enough thought to even express it in
> a Jira right now, but when I do, I'll tag you on it also to
> see what you think.
> 
> Whenever you're ready to share the usability improvement
> ideas, I'm very interested to see what you've come up with.
> 
> Thanks,
> -John
> 
> On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
>>> when you use a HashMap or RocksDB or other "state stores", you don't
>>> expect them to automatically know extra stuff about the record you're
>>> storing.
>>
>> So, I don't think there is any reason we *can't* retain the record context
>>> in the StateStoreContext, and if any users came along with a clear use case
>>> I'd find that convincing.
>>>
>>
>> I agree with the principle of being conservative with the StateStoreContext
>> API.  Regarding user expectations or a clear use case, the only
>> counterpoint I would offer is that we sort of have that use case already,
>> which is the example I gave of the change logging store using the
>> timestamp.  I am curious if this functionality will be retained when using
>> built in state stores, or will a low-level processor get a KeyValueStore
>> that no longer writes to the changelog topic with the record's timestamp.
>> While I personally don't care much about that functionality specifically, I
>> have a general desire for custom state stores to easily do the things that
>> built in state stores do.
>>
>> It genuinely did not occur to me that users might be looking up and/or
>>> updating records of other keys from within a Processor.
>>>
>>
>> I'm glad you said this Sophie, because it gives me an opportunity to say
>> that this is actually a *huge* use case for my team.  The state store
>> usability improvements I was referring to in my previous message were about
>> enabling the user to write custom stores while still easily hooking into
>> the ability to write to a changelog topic.  I think that is technically
>> possible now, but I don't think it's trivial.  Specifically, we have an
>> interest in making a state store with a richer way of querying its data
>> (like perhaps getting all values associated with a secondary key), while
>> still ultimately writing to the changelog topic for later restoration.
>>
>> We recognize that this use case throws away some of what kafka streams
>> (especially the DSL) is good at - easy parallelizability by partitioning
>> all processing by key - and that our business logic would completely fall
>> apart if we were consuming from multi-partition topics with multiple
>> consumers.  But we have found that using the low level processor API is
>> good for the very simple stream processing primitives it provides: handling
>> the plumbing of consuming from multiple kafka topics and potentially
>> updating persistent local state in a reliable way.  That in itself has
>> proven to be a worthwhile programming model.
>>
>> Since I got off track a bit, let me summarize: I don't particularly care
>> about the 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-10 Thread John Roesler
Thanks for the reply, Paul!

I certainly intend to make sure that the changelogging layer
continues to work the way it does now, by hook or by crook.
I think the easiest path for me is to just "cheat" and get
the real ProcessorContext into the ChangeLoggingStore
implementation somehow. I'll tag you on the PR when I create
it, so you have an opportunity to express a preference about
the implementation choice, and maybe even compile/test
against it to make sure your stuff still works.

Regarding this:

> we have an interest in making a state store with a richer
> way of querying its data (like perhaps getting all values
> associated with a secondary key), while still ultimately
> writing to the changelog topic for later restoration.

This is very intriguing to me. On the side, I've been
preparing a couple of ideas related to this topic. I don't
think I have a coherent enough thought to even express it in
a Jira right now, but when I do, I'll tag you on it also to
see what you think.

Whenever you're ready to share the usability improvement
ideas, I'm very interested to see what you've come up with.

Thanks,
-John

On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
> > when you use a HashMap or RocksDB or other "state stores", you don't
> > expect them to automatically know extra stuff about the record you're
> > storing.
> 
> So, I don't think there is any reason we *can't* retain the record context
> > in the StateStoreContext, and if any users came along with a clear use case
> > I'd find that convincing.
> > 
> 
> I agree with the principle of being conservative with the StateStoreContext
> API.  Regarding user expectations or a clear use case, the only
> counterpoint I would offer is that we sort of have that use case already,
> which is the example I gave of the change logging store using the
> timestamp.  I am curious if this functionality will be retained when using
> built in state stores, or will a low-level processor get a KeyValueStore
> that no longer writes to the changelog topic with the record's timestamp.
> While I personally don't care much about that functionality specifically, I
> have a general desire for custom state stores to easily do the things that
> built in state stores do.
> 
> It genuinely did not occur to me that users might be looking up and/or
> > updating records of other keys from within a Processor.
> > 
> 
> I'm glad you said this Sophie, because it gives me an opportunity to say
> that this is actually a *huge* use case for my team.  The state store
> usability improvements I was referring to in my previous message were about
> enabling the user to write custom stores while still easily hooking into
> the ability to write to a changelog topic.  I think that is technically
> possible now, but I don't think it's trivial.  Specifically, we have an
> interest in making a state store with a richer way of querying its data
> (like perhaps getting all values associated with a secondary key), while
> still ultimately writing to the changelog topic for later restoration.
> 
> We recognize that this use case throws away some of what kafka streams
> (especially the DSL) is good at - easy parallelizability by partitioning
> all processing by key - and that our business logic would completely fall
> apart if we were consuming from multi-partition topics with multiple
> consumers.  But we have found that using the low level processor API is
> good for the very simple stream processing primitives it provides: handling
> the plumbing of consuming from multiple kafka topics and potentially
> updating persistent local state in a reliable way.  That in itself has
> proven to be a worthwhile programming model.
> 
> Since I got off track a bit, let me summarize: I don't particularly care
> about the record context being available to state store implementations,
> and I think this KIP is headed in the right direction in that regard.  But
> more generally, I wanted to express the importance of maintaining a
> powerful and flexible StateStore interface.
> 
> Thanks!
> Paul
> 
> On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman 
> wrote:
> 
> > Aha, I did misinterpret the example in your previous response regarding the
> > range query after all. I thought you just meant a time-range query inside a
> > punctuator. It genuinely did not occur to me that users might be looking up
> > and/or updating records of other keys from within a Processor. Sorry for
> > being closed minded
> > 
> > I won't drag out this discussion any further by asking whether that might
> > be
> > a valid use case or just a lurking bug in itself :)
> > 
> > Thanks for humoring me. The current proposal for KIP-478 sounds good to me
> > 
> > On Thu, Sep 10, 2020 at 3:43 PM John Roesler  wrote:
> > 
> > > Ah, thanks Sophie,
> > > 
> > > I'm sorry for misinterpreting your resonse. Yes, we
> > > absolutely can and should clear the context before
> > > punctuating.
> > > 
> > > My secondary concern is maybe more far-fetched. I 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-10 Thread Paul Whalen
>
> when you use a HashMap or RocksDB or other "state stores", you don't
> expect them to automatically know extra stuff about the record you're
> storing.


So, I don't think there is any reason we *can't* retain the record context
> in the StateStoreContext, and if any users came along with a clear use case
> I'd find that convincing.
>

I agree with the principle of being conservative with the StateStoreContext
API.  Regarding user expectations or a clear use case, the only
counterpoint I would offer is that we sort of have that use case already,
which is the example I gave of the change logging store using the
timestamp.  I am curious if this functionality will be retained when using
built in state stores, or will a low-level processor get a KeyValueStore
that no longer writes to the changelog topic with the record's timestamp.
While I personally don't care much about that functionality specifically, I
have a general desire for custom state stores to easily do the things that
built in state stores do.

It genuinely did not occur to me that users might be looking up and/or
> updating records of other keys from within a Processor.
>

I'm glad you said this Sophie, because it gives me an opportunity to say
that this is actually a *huge* use case for my team.  The state store
usability improvements I was referring to in my previous message were about
enabling the user to write custom stores while still easily hooking into
the ability to write to a changelog topic.  I think that is technically
possible now, but I don't think it's trivial.  Specifically, we have an
interest in making a state store with a richer way of querying its data
(like perhaps getting all values associated with a secondary key), while
still ultimately writing to the changelog topic for later restoration.

We recognize that this use case throws away some of what kafka streams
(especially the DSL) is good at - easy parallelizability by partitioning
all processing by key - and that our business logic would completely fall
apart if we were consuming from multi-partition topics with multiple
consumers.  But we have found that using the low level processor API is
good for the very simple stream processing primitives it provides: handling
the plumbing of consuming from multiple kafka topics and potentially
updating persistent local state in a reliable way.  That in itself has
proven to be a worthwhile programming model.

Since I got off track a bit, let me summarize: I don't particularly care
about the record context being available to state store implementations,
and I think this KIP is headed in the right direction in that regard.  But
more generally, I wanted to express the importance of maintaining a
powerful and flexible StateStore interface.

Thanks!
Paul

On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman 
wrote:

> Aha, I did misinterpret the example in your previous response regarding the
> range query after all. I thought you just meant a time-range query inside a
> punctuator. It genuinely did not occur to me that users might be looking up
> and/or updating records of other keys from within a Processor. Sorry for
> being closed minded
>
> I won't drag out this discussion any further by asking whether that might
> be
> a valid use case or just a lurking bug in itself :)
>
> Thanks for humoring me. The current proposal for KIP-478 sounds good to me
>
> On Thu, Sep 10, 2020 at 3:43 PM John Roesler  wrote:
>
> > Ah, thanks Sophie,
> >
> > I'm sorry for misinterpreting your resonse. Yes, we
> > absolutely can and should clear the context before
> > punctuating.
> >
> > My secondary concern is maybe more far-fetched. I was
> > thinking that inside process(key,value), a Processor might
> > do a get/put of a _different_ key. Consider, for example,
> > the way that Suppress processors work. When they get a
> > record, they add it to the store and then do a range scan
> > and possibly forward a _different_ record. Of course, this
> > is an operation that is deeply coupled to the internals, and
> > the Suppress processor accordingly actually does get access
> > to the internal context so that it can set the context
> > before forwarding.
> >
> > Still, it seems like I've had a handful of conversations
> > with people over the years in which they tell me they are
> > using state stores in a way that transcends the "get and put
> > the currently processing record" access pattern. I doubt
> > that those folks would even have considered the possiblity
> > that the currently processing record's _context_ could
> > pollute their state store operations, as I myself never gave
> > it a second thought until the current conversation began. In
> > cases like that, we have actually set a trap for these
> > people, and it seems better to dismantle the trap.
> >
> > As you noted, really the only people who would be negatively
> > impacted are people who implement their own state stores.
> > These folks will get the deprecation warning and try to
> > 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-10 Thread John Roesler
Thanks for the conversation, Sophie! Sorry for the ambiguity
I introduced into it, though.

Thanks,
-John

On Thu, 2020-09-10 at 16:10 -0700, Sophie Blee-Goldman
wrote:
> Aha, I did misinterpret the example in your previous response regarding the
> range query after all. I thought you just meant a time-range query inside a
> punctuator. It genuinely did not occur to me that users might be looking up
> and/or updating records of other keys from within a Processor. Sorry for
> being closed minded
> 
> I won't drag out this discussion any further by asking whether that might be
> a valid use case or just a lurking bug in itself :)
> 
> Thanks for humoring me. The current proposal for KIP-478 sounds good to me
> 
> On Thu, Sep 10, 2020 at 3:43 PM John Roesler  wrote:
> 
> > Ah, thanks Sophie,
> > 
> > I'm sorry for misinterpreting your resonse. Yes, we
> > absolutely can and should clear the context before
> > punctuating.
> > 
> > My secondary concern is maybe more far-fetched. I was
> > thinking that inside process(key,value), a Processor might
> > do a get/put of a _different_ key. Consider, for example,
> > the way that Suppress processors work. When they get a
> > record, they add it to the store and then do a range scan
> > and possibly forward a _different_ record. Of course, this
> > is an operation that is deeply coupled to the internals, and
> > the Suppress processor accordingly actually does get access
> > to the internal context so that it can set the context
> > before forwarding.
> > 
> > Still, it seems like I've had a handful of conversations
> > with people over the years in which they tell me they are
> > using state stores in a way that transcends the "get and put
> > the currently processing record" access pattern. I doubt
> > that those folks would even have considered the possiblity
> > that the currently processing record's _context_ could
> > pollute their state store operations, as I myself never gave
> > it a second thought until the current conversation began. In
> > cases like that, we have actually set a trap for these
> > people, and it seems better to dismantle the trap.
> > 
> > As you noted, really the only people who would be negatively
> > impacted are people who implement their own state stores.
> > These folks will get the deprecation warning and try to
> > adapt their stores to the new interface. If they needed
> > access to the record context, they would find it's now
> > missing. They'd ask us about it, and we'd have the ability
> > to explain the lurking bug that they have had in their
> > stores all along, as well as the new recommended pattern
> > (just pass everything you need in the value). If that's
> > unsatisfying, _then_ we should consider amending the API.
> > 
> > Thanks,
> > -John
> > 
> > On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman
> > wrote:
> > > > Regarding your first sentence, "...the processor would null
> > > > out the record context...", this is not possible, since the
> > > > processor doesn't have write access to the context. We could
> > > > add it,
> > > > 
> > > 
> > > Sorry, this was poorly phrased, I definitely did not mean to imply that
> > we
> > > should make the context modifiable by the Processors themselves. I meant
> > > this should be handled by the internal processing framework that deals
> > with
> > > passing records from one Processor to the next, setting the record
> > context
> > > when a new record is picked up, invoking the punctuators, etc. I believe
> > > this
> > > all currently happens in the StreamTask? It already can and does
> > overwrite
> > > the record context as new records are processed, and is also responsible
> > > for calling the punctuators, so it doesn't seem like a huge leap to just
> > say
> > > "null out the current record before punctuating"
> > > 
> > > To clarify, I was never advocating or even considering to give the
> > > Processors
> > > write access to the record context. Sorry if my last message (or all of
> > > them)
> > > was misleading. I just wanted to point out that the punctuator concern is
> > > orthogonal to the question of whether we should include the record
> > context
> > > in the StateStoreContext. It's definitely a real problem, but it's a
> > > problem
> > > that exists at the Processor level and not just the StateStore.
> > > 
> > > So, I don't think there is any reason we *can't* retain the record
> > context
> > > in the
> > > StateStoreContext, and if any users came along with a clear use case I'd
> > > find
> > > that convincing. In the absence of any examples, the conservative
> > approach
> > > sounds good to me.
> > > 
> > > If it turns out that someone did need the record context in their custom
> > > state
> > > store, I'm sure they'll submit a politely worded bug report alerting us
> > > that we
> > > broke their application.
> > > 
> > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler 
> > wrote:
> > > > Thanks, Sophie,
> > > > 
> > > > Yes, now that you point it 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-10 Thread Sophie Blee-Goldman
Aha, I did misinterpret the example in your previous response regarding the
range query after all. I thought you just meant a time-range query inside a
punctuator. It genuinely did not occur to me that users might be looking up
and/or updating records of other keys from within a Processor. Sorry for
being closed minded

I won't drag out this discussion any further by asking whether that might be
a valid use case or just a lurking bug in itself :)

Thanks for humoring me. The current proposal for KIP-478 sounds good to me

On Thu, Sep 10, 2020 at 3:43 PM John Roesler  wrote:

> Ah, thanks Sophie,
>
> I'm sorry for misinterpreting your resonse. Yes, we
> absolutely can and should clear the context before
> punctuating.
>
> My secondary concern is maybe more far-fetched. I was
> thinking that inside process(key,value), a Processor might
> do a get/put of a _different_ key. Consider, for example,
> the way that Suppress processors work. When they get a
> record, they add it to the store and then do a range scan
> and possibly forward a _different_ record. Of course, this
> is an operation that is deeply coupled to the internals, and
> the Suppress processor accordingly actually does get access
> to the internal context so that it can set the context
> before forwarding.
>
> Still, it seems like I've had a handful of conversations
> with people over the years in which they tell me they are
> using state stores in a way that transcends the "get and put
> the currently processing record" access pattern. I doubt
> that those folks would even have considered the possiblity
> that the currently processing record's _context_ could
> pollute their state store operations, as I myself never gave
> it a second thought until the current conversation began. In
> cases like that, we have actually set a trap for these
> people, and it seems better to dismantle the trap.
>
> As you noted, really the only people who would be negatively
> impacted are people who implement their own state stores.
> These folks will get the deprecation warning and try to
> adapt their stores to the new interface. If they needed
> access to the record context, they would find it's now
> missing. They'd ask us about it, and we'd have the ability
> to explain the lurking bug that they have had in their
> stores all along, as well as the new recommended pattern
> (just pass everything you need in the value). If that's
> unsatisfying, _then_ we should consider amending the API.
>
> Thanks,
> -John
>
> On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman
> wrote:
> > > Regarding your first sentence, "...the processor would null
> > > out the record context...", this is not possible, since the
> > > processor doesn't have write access to the context. We could
> > > add it,
> > >
> >
> > Sorry, this was poorly phrased, I definitely did not mean to imply that
> we
> > should make the context modifiable by the Processors themselves. I meant
> > this should be handled by the internal processing framework that deals
> with
> > passing records from one Processor to the next, setting the record
> context
> > when a new record is picked up, invoking the punctuators, etc. I believe
> > this
> > all currently happens in the StreamTask? It already can and does
> overwrite
> > the record context as new records are processed, and is also responsible
> > for calling the punctuators, so it doesn't seem like a huge leap to just
> say
> > "null out the current record before punctuating"
> >
> > To clarify, I was never advocating or even considering to give the
> > Processors
> > write access to the record context. Sorry if my last message (or all of
> > them)
> > was misleading. I just wanted to point out that the punctuator concern is
> > orthogonal to the question of whether we should include the record
> context
> > in the StateStoreContext. It's definitely a real problem, but it's a
> > problem
> > that exists at the Processor level and not just the StateStore.
> >
> > So, I don't think there is any reason we *can't* retain the record
> context
> > in the
> > StateStoreContext, and if any users came along with a clear use case I'd
> > find
> > that convincing. In the absence of any examples, the conservative
> approach
> > sounds good to me.
> >
> > If it turns out that someone did need the record context in their custom
> > state
> > store, I'm sure they'll submit a politely worded bug report alerting us
> > that we
> > broke their application.
> >
> > On Thu, Sep 10, 2020 at 3:05 PM John Roesler 
> wrote:
> >
> > > Thanks, Sophie,
> > >
> > > Yes, now that you point it out, I can see that the record
> > > context itself should be nulled out by Streams before
> > > invoking punctuators. From that perspective, we don't need
> > > to think about the second-order problem of what's in the
> > > context for the state store when called from a punctuator.
> > >
> > > Regarding your first sentence, "...the processor would null
> > > out the record context...", this is 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-10 Thread John Roesler
Ah, thanks Sophie,

I'm sorry for misinterpreting your resonse. Yes, we
absolutely can and should clear the context before
punctuating.

My secondary concern is maybe more far-fetched. I was
thinking that inside process(key,value), a Processor might
do a get/put of a _different_ key. Consider, for example,
the way that Suppress processors work. When they get a
record, they add it to the store and then do a range scan
and possibly forward a _different_ record. Of course, this
is an operation that is deeply coupled to the internals, and
the Suppress processor accordingly actually does get access
to the internal context so that it can set the context
before forwarding.

Still, it seems like I've had a handful of conversations
with people over the years in which they tell me they are
using state stores in a way that transcends the "get and put
the currently processing record" access pattern. I doubt
that those folks would even have considered the possiblity
that the currently processing record's _context_ could
pollute their state store operations, as I myself never gave
it a second thought until the current conversation began. In
cases like that, we have actually set a trap for these
people, and it seems better to dismantle the trap.

As you noted, really the only people who would be negatively
impacted are people who implement their own state stores.
These folks will get the deprecation warning and try to
adapt their stores to the new interface. If they needed
access to the record context, they would find it's now
missing. They'd ask us about it, and we'd have the ability
to explain the lurking bug that they have had in their
stores all along, as well as the new recommended pattern
(just pass everything you need in the value). If that's
unsatisfying, _then_ we should consider amending the API.

Thanks,
-John

On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman
wrote:
> > Regarding your first sentence, "...the processor would null
> > out the record context...", this is not possible, since the
> > processor doesn't have write access to the context. We could
> > add it,
> > 
> 
> Sorry, this was poorly phrased, I definitely did not mean to imply that we
> should make the context modifiable by the Processors themselves. I meant
> this should be handled by the internal processing framework that deals with
> passing records from one Processor to the next, setting the record context
> when a new record is picked up, invoking the punctuators, etc. I believe
> this
> all currently happens in the StreamTask? It already can and does overwrite
> the record context as new records are processed, and is also responsible
> for calling the punctuators, so it doesn't seem like a huge leap to just say
> "null out the current record before punctuating"
> 
> To clarify, I was never advocating or even considering to give the
> Processors
> write access to the record context. Sorry if my last message (or all of
> them)
> was misleading. I just wanted to point out that the punctuator concern is
> orthogonal to the question of whether we should include the record context
> in the StateStoreContext. It's definitely a real problem, but it's a
> problem
> that exists at the Processor level and not just the StateStore.
> 
> So, I don't think there is any reason we *can't* retain the record context
> in the
> StateStoreContext, and if any users came along with a clear use case I'd
> find
> that convincing. In the absence of any examples, the conservative approach
> sounds good to me.
> 
> If it turns out that someone did need the record context in their custom
> state
> store, I'm sure they'll submit a politely worded bug report alerting us
> that we
> broke their application.
> 
> On Thu, Sep 10, 2020 at 3:05 PM John Roesler  wrote:
> 
> > Thanks, Sophie,
> > 
> > Yes, now that you point it out, I can see that the record
> > context itself should be nulled out by Streams before
> > invoking punctuators. From that perspective, we don't need
> > to think about the second-order problem of what's in the
> > context for the state store when called from a punctuator.
> > 
> > Regarding your first sentence, "...the processor would null
> > out the record context...", this is not possible, since the
> > processor doesn't have write access to the context. We could
> > add it, but then all kinds of strange effects would ensue
> > when downstream processors execute but the context is empty,
> > etc. Better to just let the framework manage the record
> > context and keep it read-only for Processors.
> > 
> > Reading between the lines of your last reply, it sounds that
> > the disconnect may just have been a mutual misunderstanding
> > about whether or not Processors currently have access to set
> > the record context. Since they do not, if we wanted to add
> > the record context to StateStoreContext in a well-defined
> > way, we'd also have to add the ability for Processors to
> > manipulate it. But then, we're just creating a side-channel
> > 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-10 Thread Sophie Blee-Goldman
>
> Regarding your first sentence, "...the processor would null
> out the record context...", this is not possible, since the
> processor doesn't have write access to the context. We could
> add it,
>

Sorry, this was poorly phrased, I definitely did not mean to imply that we
should make the context modifiable by the Processors themselves. I meant
this should be handled by the internal processing framework that deals with
passing records from one Processor to the next, setting the record context
when a new record is picked up, invoking the punctuators, etc. I believe
this
all currently happens in the StreamTask? It already can and does overwrite
the record context as new records are processed, and is also responsible
for calling the punctuators, so it doesn't seem like a huge leap to just say
"null out the current record before punctuating"

To clarify, I was never advocating or even considering to give the
Processors
write access to the record context. Sorry if my last message (or all of
them)
was misleading. I just wanted to point out that the punctuator concern is
orthogonal to the question of whether we should include the record context
in the StateStoreContext. It's definitely a real problem, but it's a
problem
that exists at the Processor level and not just the StateStore.

So, I don't think there is any reason we *can't* retain the record context
in the
StateStoreContext, and if any users came along with a clear use case I'd
find
that convincing. In the absence of any examples, the conservative approach
sounds good to me.

If it turns out that someone did need the record context in their custom
state
store, I'm sure they'll submit a politely worded bug report alerting us
that we
broke their application.

On Thu, Sep 10, 2020 at 3:05 PM John Roesler  wrote:

> Thanks, Sophie,
>
> Yes, now that you point it out, I can see that the record
> context itself should be nulled out by Streams before
> invoking punctuators. From that perspective, we don't need
> to think about the second-order problem of what's in the
> context for the state store when called from a punctuator.
>
> Regarding your first sentence, "...the processor would null
> out the record context...", this is not possible, since the
> processor doesn't have write access to the context. We could
> add it, but then all kinds of strange effects would ensue
> when downstream processors execute but the context is empty,
> etc. Better to just let the framework manage the record
> context and keep it read-only for Processors.
>
> Reading between the lines of your last reply, it sounds that
> the disconnect may just have been a mutual misunderstanding
> about whether or not Processors currently have access to set
> the record context. Since they do not, if we wanted to add
> the record context to StateStoreContext in a well-defined
> way, we'd also have to add the ability for Processors to
> manipulate it. But then, we're just creating a side-channel
> for Processors to pass some information in arguments to
> "put()" and other information implicitly through the
> context. It seems better just to go for a single channel for
> now.
>
> It sounds like you're basically in favor of the conservative
> approach, and you just wanted to understand the blockers
> that I implied. Does my clarification make sense?
>
> Thanks,
> -John
>
> On Thu, 2020-09-10 at 10:54 -0700, Sophie Blee-Goldman
> wrote:
> > I was just thinking that the processor would null out the record context
> > after it
> > finished processing the record, so I'm not sure I follow why this would
> not
> > be
> > possible? AFAIK we never call a punctuator in the middle of processing a
> > record through the topology, and even if we did, we still know when it is
> > about
> > to be called and could set it to null beforehand.
> >
> > I'm not trying to advocate for it here, I'm in agreement that anything
> you
> > want
> > to access within the store can and should be accessed within the calling
> > Processor/Punctuator before reaching the store. The "we can always add it
> > later if necessary" argument is also pretty convincing. Just trying to
> > understand
> > why this wouldn't be possible.
> >
> > FWIW, the question of "what is the current record in the context of a
> > Punctuator"
> > exists independently of whether we want to add this to the
> StateStoreContext
> > or not. The full ProcessorContext, including the current record context,
> is
> > already available within a Punctuator, so removing the current record
> > context
> > from the StateStoreContext does not solve the problem. Users can -- and
> have
> > (see KAFKA-9584 ;) --
> hit
> > such subtle bugs without ever invoking a StateStore
> > from their punctuator.
> >
> > Again, I think I do agree that we should leave the current record context
> > off of
> > the StateStoreContext, but I don't think the Punctuator argument against
> it
> > is
> > very convincing. It sounds to me like 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-10 Thread John Roesler
Thanks, Sophie,

Yes, now that you point it out, I can see that the record
context itself should be nulled out by Streams before
invoking punctuators. From that perspective, we don't need
to think about the second-order problem of what's in the
context for the state store when called from a punctuator.

Regarding your first sentence, "...the processor would null
out the record context...", this is not possible, since the
processor doesn't have write access to the context. We could
add it, but then all kinds of strange effects would ensue
when downstream processors execute but the context is empty,
etc. Better to just let the framework manage the record
context and keep it read-only for Processors.

Reading between the lines of your last reply, it sounds that
the disconnect may just have been a mutual misunderstanding
about whether or not Processors currently have access to set
the record context. Since they do not, if we wanted to add
the record context to StateStoreContext in a well-defined
way, we'd also have to add the ability for Processors to
manipulate it. But then, we're just creating a side-channel
for Processors to pass some information in arguments to
"put()" and other information implicitly through the
context. It seems better just to go for a single channel for
now.

It sounds like you're basically in favor of the conservative
approach, and you just wanted to understand the blockers
that I implied. Does my clarification make sense?

Thanks,
-John

On Thu, 2020-09-10 at 10:54 -0700, Sophie Blee-Goldman
wrote:
> I was just thinking that the processor would null out the record context
> after it
> finished processing the record, so I'm not sure I follow why this would not
> be
> possible? AFAIK we never call a punctuator in the middle of processing a
> record through the topology, and even if we did, we still know when it is
> about
> to be called and could set it to null beforehand.
> 
> I'm not trying to advocate for it here, I'm in agreement that anything you
> want
> to access within the store can and should be accessed within the calling
> Processor/Punctuator before reaching the store. The "we can always add it
> later if necessary" argument is also pretty convincing. Just trying to
> understand
> why this wouldn't be possible.
> 
> FWIW, the question of "what is the current record in the context of a
> Punctuator"
> exists independently of whether we want to add this to the StateStoreContext
> or not. The full ProcessorContext, including the current record context, is
> already available within a Punctuator, so removing the current record
> context
> from the StateStoreContext does not solve the problem. Users can -- and have
> (see KAFKA-9584 ;) -- hit
> such subtle bugs without ever invoking a StateStore
> from their punctuator.
> 
> Again, I think I do agree that we should leave the current record context
> off of
> the StateStoreContext, but I don't think the Punctuator argument against it
> is
> very convincing. It sounds to me like we need to disallow access to the
> current
> record context from within the Punctuator, independent of anything to do
> with
> state stores
> 
> On Thu, Sep 10, 2020 at 7:12 AM John Roesler  wrote:
> 
> > Thanks for the thoughts, Sophie.
> > 
> > I agree that the extra information could be useful. My only concern is
> > that it doesn’t seem like we can actually supply that extra information
> > correctly. So, then we have a situation where the system offers useful API
> > calls that are only correct in a narrow range of use cases. Outside of
> > those use cases, you get incorrect behavior.
> > 
> > If it were possible to null out the context before you put a document to
> > which the context doesn’t apply, then the concern would be mitigated. But
> > it would still be pretty weird from the perspective of the store that
> > sometimes the context is populated and other times, it’s null.
> > 
> > But that seems moot, since it doesn’t seem possible to null out the
> > context. Only the Processor could know whether it’s about to put a document
> > different from the context or not. And it would be inappropriate to offer a
> > public ProcessorContext api to manage the record context.
> > 
> > Ultimately, it still seems like if you want to store headers, you can
> > store them explicitly, right? That doesn’t seem onerous to me, and it kind
> > of seems better than relying on undefined or asymmetrical behavior in the
> > store itself.
> > 
> > Anyway, I’m not saying that we couldn’t solve these problems. Just that it
> > seems a little that we can be conservative and avoid them for now. If it
> > turns out we really need to solve them, we can always do it later.
> > 
> > Thanks,
> > John
> > 
> > On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote:
> > > > If you were to call "put" from a punctuator, or do a
> > > > `range()` query and then update one of those records with
> > > > `put()`, you'd have a very 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-10 Thread Sophie Blee-Goldman
I was just thinking that the processor would null out the record context
after it
finished processing the record, so I'm not sure I follow why this would not
be
possible? AFAIK we never call a punctuator in the middle of processing a
record through the topology, and even if we did, we still know when it is
about
to be called and could set it to null beforehand.

I'm not trying to advocate for it here, I'm in agreement that anything you
want
to access within the store can and should be accessed within the calling
Processor/Punctuator before reaching the store. The "we can always add it
later if necessary" argument is also pretty convincing. Just trying to
understand
why this wouldn't be possible.

FWIW, the question of "what is the current record in the context of a
Punctuator"
exists independently of whether we want to add this to the StateStoreContext
or not. The full ProcessorContext, including the current record context, is
already available within a Punctuator, so removing the current record
context
from the StateStoreContext does not solve the problem. Users can -- and have
(see KAFKA-9584 ) -- hit
such subtle bugs without ever invoking a StateStore
from their punctuator.

Again, I think I do agree that we should leave the current record context
off of
the StateStoreContext, but I don't think the Punctuator argument against it
is
very convincing. It sounds to me like we need to disallow access to the
current
record context from within the Punctuator, independent of anything to do
with
state stores

On Thu, Sep 10, 2020 at 7:12 AM John Roesler  wrote:

> Thanks for the thoughts, Sophie.
>
> I agree that the extra information could be useful. My only concern is
> that it doesn’t seem like we can actually supply that extra information
> correctly. So, then we have a situation where the system offers useful API
> calls that are only correct in a narrow range of use cases. Outside of
> those use cases, you get incorrect behavior.
>
> If it were possible to null out the context before you put a document to
> which the context doesn’t apply, then the concern would be mitigated. But
> it would still be pretty weird from the perspective of the store that
> sometimes the context is populated and other times, it’s null.
>
> But that seems moot, since it doesn’t seem possible to null out the
> context. Only the Processor could know whether it’s about to put a document
> different from the context or not. And it would be inappropriate to offer a
> public ProcessorContext api to manage the record context.
>
> Ultimately, it still seems like if you want to store headers, you can
> store them explicitly, right? That doesn’t seem onerous to me, and it kind
> of seems better than relying on undefined or asymmetrical behavior in the
> store itself.
>
> Anyway, I’m not saying that we couldn’t solve these problems. Just that it
> seems a little that we can be conservative and avoid them for now. If it
> turns out we really need to solve them, we can always do it later.
>
> Thanks,
> John
>
> On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote:
> > >
> > > If you were to call "put" from a punctuator, or do a
> > > `range()` query and then update one of those records with
> > > `put()`, you'd have a very subtle bug on your hands.
> >
> >
> > Can you elaborate on this a bit? I agree that the punctuator case is an
> > obvious exemption to the assumption that store invocations always
> > have a corresponding "current record", but I don't understand the
> > second example. Are you envisioning a scenario where the #process
> > method performs a range query and then updates records? Or were
> > you just giving another example of the punctuator case?
> >
> > I only bring it up because I agree that the current record information
> could
> > still be useful within the context of the store. As a non-user my input
> on
> > this
> > definitely has limited value, but it just isn't striking me as obvious
> that
> > we
> > should remove access to the current record context from the state stores.
> > If there is no current record, as in the  punctuator case, we should just
> > set
> > the record context to null (or Optional.empty, etc).
> >
> > That said, the put() always has to come from somewhere, and that
> > somewhere is always going to be either a Processor or a Punctuator, both
> > of which will still have access to the full context. So additional info
> > such as
> > the timestamp can and should probably be supplied to the store before
> > calling put(), rather than looked up by the store. But I can see some
> other
> > things being useful, for example the current record's headers. Maybe
> if/when
> > we add better (or any) support for headers in state stores this will be
> > less true.
> >
> > Of course as John has made clear, it's pretty hard to judge without
> > examples
> > and more insight as to what actually goes on within a custom state store
> >
> > On Wed, Sep 9, 2020 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-10 Thread John Roesler
Thanks for the thoughts, Sophie.

I agree that the extra information could be useful. My only concern is that it 
doesn’t seem like we can actually supply that extra information correctly. So, 
then we have a situation where the system offers useful API calls that are only 
correct in a narrow range of use cases. Outside of those use cases, you get 
incorrect behavior.

If it were possible to null out the context before you put a document to which 
the context doesn’t apply, then the concern would be mitigated. But it would 
still be pretty weird from the perspective of the store that sometimes the 
context is populated and other times, it’s null.

But that seems moot, since it doesn’t seem possible to null out the context. 
Only the Processor could know whether it’s about to put a document different 
from the context or not. And it would be inappropriate to offer a public 
ProcessorContext api to manage the record context.

Ultimately, it still seems like if you want to store headers, you can store 
them explicitly, right? That doesn’t seem onerous to me, and it kind of seems 
better than relying on undefined or asymmetrical behavior in the store itself.

Anyway, I’m not saying that we couldn’t solve these problems. Just that it 
seems a little that we can be conservative and avoid them for now. If it turns 
out we really need to solve them, we can always do it later. 

Thanks,
John

On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote:
> >
> > If you were to call "put" from a punctuator, or do a
> > `range()` query and then update one of those records with
> > `put()`, you'd have a very subtle bug on your hands.
> 
> 
> Can you elaborate on this a bit? I agree that the punctuator case is an
> obvious exemption to the assumption that store invocations always
> have a corresponding "current record", but I don't understand the
> second example. Are you envisioning a scenario where the #process
> method performs a range query and then updates records? Or were
> you just giving another example of the punctuator case?
> 
> I only bring it up because I agree that the current record information could
> still be useful within the context of the store. As a non-user my input on
> this
> definitely has limited value, but it just isn't striking me as obvious that
> we
> should remove access to the current record context from the state stores.
> If there is no current record, as in the  punctuator case, we should just
> set
> the record context to null (or Optional.empty, etc).
> 
> That said, the put() always has to come from somewhere, and that
> somewhere is always going to be either a Processor or a Punctuator, both
> of which will still have access to the full context. So additional info
> such as
> the timestamp can and should probably be supplied to the store before
> calling put(), rather than looked up by the store. But I can see some other
> things being useful, for example the current record's headers. Maybe if/when
> we add better (or any) support for headers in state stores this will be
> less true.
> 
> Of course as John has made clear, it's pretty hard to judge without
> examples
> and more insight as to what actually goes on within a custom state store
> 
> On Wed, Sep 9, 2020 at 8:07 PM John Roesler  wrote:
> 
> > Hi Paul,
> >
> > It's good to hear from you!
> >
> > I'm glad you're in favor of the direction. Especially when
> > it comes to public API and usability concens, I tend to
> > think that "the folks who matter" are actually the folks who
> > have to use the APIs to accomplish real tasks. It can be
> > hard for me to be sure I'm thinking clearly from that
> > perspective.
> >
> > Funny story, I also started down this road a couple of times
> > already and backed them out before the KIP because I was
> > afraid of the scope of the proposal. Unfortunately, needing
> > to make a new ProcessorContext kind of forced my hand.
> >
> > I see you've called me out about the ChangeLogging stores :)
> > In fact, I think these are the main/only reason that stores
> > might really need to invoke "forward()". My secret plan was
> > to cheat and either accomplish change-logging by a different
> > mechanism than implementing the store interface, or by just
> > breaking encapsulation to sneak the "real" ProcessorContext
> > into the ChangeLogging stores. But those are all
> > implementation details. I think the key question is whether
> > anyone else has a store implementation that needs to call
> > "forward()". It's not what you mentioned, but since you
> > spoke up, I'll just ask: if you have a use case for calling
> > "forward()" in a store, please share it.
> >
> > Regarding the other record-specific context methods, I think
> > you have a good point, but I also can't quite wrap my head
> > around how we can actually guarantee it to work in general.
> > For example, the case you cited, where the implementation of
> > `KeyValueStore#put(key, value)` uses the context to augment
> > the record with 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-09 Thread Sophie Blee-Goldman
>
> If you were to call "put" from a punctuator, or do a
> `range()` query and then update one of those records with
> `put()`, you'd have a very subtle bug on your hands.


Can you elaborate on this a bit? I agree that the punctuator case is an
obvious exemption to the assumption that store invocations always
have a corresponding "current record", but I don't understand the
second example. Are you envisioning a scenario where the #process
method performs a range query and then updates records? Or were
you just giving another example of the punctuator case?

I only bring it up because I agree that the current record information could
still be useful within the context of the store. As a non-user my input on
this
definitely has limited value, but it just isn't striking me as obvious that
we
should remove access to the current record context from the state stores.
If there is no current record, as in the  punctuator case, we should just
set
the record context to null (or Optional.empty, etc).

That said, the put() always has to come from somewhere, and that
somewhere is always going to be either a Processor or a Punctuator, both
of which will still have access to the full context. So additional info
such as
the timestamp can and should probably be supplied to the store before
calling put(), rather than looked up by the store. But I can see some other
things being useful, for example the current record's headers. Maybe if/when
we add better (or any) support for headers in state stores this will be
less true.

Of course as John has made clear, it's pretty hard to judge without
examples
and more insight as to what actually goes on within a custom state store

On Wed, Sep 9, 2020 at 8:07 PM John Roesler  wrote:

> Hi Paul,
>
> It's good to hear from you!
>
> I'm glad you're in favor of the direction. Especially when
> it comes to public API and usability concens, I tend to
> think that "the folks who matter" are actually the folks who
> have to use the APIs to accomplish real tasks. It can be
> hard for me to be sure I'm thinking clearly from that
> perspective.
>
> Funny story, I also started down this road a couple of times
> already and backed them out before the KIP because I was
> afraid of the scope of the proposal. Unfortunately, needing
> to make a new ProcessorContext kind of forced my hand.
>
> I see you've called me out about the ChangeLogging stores :)
> In fact, I think these are the main/only reason that stores
> might really need to invoke "forward()". My secret plan was
> to cheat and either accomplish change-logging by a different
> mechanism than implementing the store interface, or by just
> breaking encapsulation to sneak the "real" ProcessorContext
> into the ChangeLogging stores. But those are all
> implementation details. I think the key question is whether
> anyone else has a store implementation that needs to call
> "forward()". It's not what you mentioned, but since you
> spoke up, I'll just ask: if you have a use case for calling
> "forward()" in a store, please share it.
>
> Regarding the other record-specific context methods, I think
> you have a good point, but I also can't quite wrap my head
> around how we can actually guarantee it to work in general.
> For example, the case you cited, where the implementation of
> `KeyValueStore#put(key, value)` uses the context to augment
> the record with timestamp information. This relies on the
> assumption that you would only call "put()" from inside a
> `Processor#process(key, value)` call in which the record
> being processed is the same record that you're trying to put
> into the store.
>
> If you were to call "put" from a punctuator, or do a
> `range()` query and then update one of those records with
> `put()`, you'd have a very subtle bug on your hands. Right
> now, the Streams component that actually calls the Processor
> takes care to set the right record context before invoking
> the method, and in the case of caching, etc., it also takes
> care to swap out the old context and keep it somewhere safe.
> But when it comes to public API Processors calling methods
> on StateStores, there's no opportunity for any component to
> make sure the context is always correct.
>
> In the face of that situation, it seemed better to just move
> in the direction of a "normal" data store. I.e., when you
> use a HashMap or RocksDB or other "state stores", you don't
> expect them to automatically know extra stuff about the
> record you're storing. If you need them to know something,
> you just put it in the value.
>
> All of that said, I'm just reasoning from first principles
> here. To really know if this is a mistake or not, I need to
> be in your place. So please push back if you think what I
> said is nonsense. My personal plan was to keep an eye out
> during the period where the old API was still present, but
> deprecated, to see if people were struggling to use the new
> API. If so, then we'd have a chance to address it before
> dropping the old 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-09 Thread John Roesler
Hi Paul,

It's good to hear from you!

I'm glad you're in favor of the direction. Especially when
it comes to public API and usability concens, I tend to
think that "the folks who matter" are actually the folks who
have to use the APIs to accomplish real tasks. It can be
hard for me to be sure I'm thinking clearly from that
perspective.

Funny story, I also started down this road a couple of times
already and backed them out before the KIP because I was
afraid of the scope of the proposal. Unfortunately, needing
to make a new ProcessorContext kind of forced my hand.

I see you've called me out about the ChangeLogging stores :)
In fact, I think these are the main/only reason that stores
might really need to invoke "forward()". My secret plan was
to cheat and either accomplish change-logging by a different
mechanism than implementing the store interface, or by just
breaking encapsulation to sneak the "real" ProcessorContext
into the ChangeLogging stores. But those are all
implementation details. I think the key question is whether
anyone else has a store implementation that needs to call
"forward()". It's not what you mentioned, but since you
spoke up, I'll just ask: if you have a use case for calling
"forward()" in a store, please share it.

Regarding the other record-specific context methods, I think
you have a good point, but I also can't quite wrap my head
around how we can actually guarantee it to work in general.
For example, the case you cited, where the implementation of
`KeyValueStore#put(key, value)` uses the context to augment
the record with timestamp information. This relies on the
assumption that you would only call "put()" from inside a
`Processor#process(key, value)` call in which the record
being processed is the same record that you're trying to put
into the store.

If you were to call "put" from a punctuator, or do a
`range()` query and then update one of those records with
`put()`, you'd have a very subtle bug on your hands. Right
now, the Streams component that actually calls the Processor
takes care to set the right record context before invoking
the method, and in the case of caching, etc., it also takes
care to swap out the old context and keep it somewhere safe.
But when it comes to public API Processors calling methods
on StateStores, there's no opportunity for any component to
make sure the context is always correct.

In the face of that situation, it seemed better to just move
in the direction of a "normal" data store. I.e., when you
use a HashMap or RocksDB or other "state stores", you don't
expect them to automatically know extra stuff about the
record you're storing. If you need them to know something,
you just put it in the value.

All of that said, I'm just reasoning from first principles
here. To really know if this is a mistake or not, I need to
be in your place. So please push back if you think what I
said is nonsense. My personal plan was to keep an eye out
during the period where the old API was still present, but
deprecated, to see if people were struggling to use the new
API. If so, then we'd have a chance to address it before
dropping the old API. But it's even better if you can help
think it through now.

It did also cross my mind to _not_ add the
StateStoreContext, but just to continue to punt on the
question by just dropping in the new ProcessorContext to the
new init method. If StateStoreContext seems too bold, we can
go that direction. But if we actually add some methods to
StateStoreContext, I'd like to be able to ensure they would
be well defined. I think the current situation was more of
an oversight than a choice.

Thanks again for your reply,
-John


On Wed, 2020-09-09 at 21:23 -0500, Paul Whalen wrote:
> John,
> 
> It's exciting to see this KIP head in this direction!  In the last year or
> so I've tried to sketch out some usability improvements for custom state
> stores, and I also ended up splitting out the StateStoreContext from the
> ProcessorContext in an attempt to facilitate what I was doing.  I sort of
> abandoned it when I realized how large the ideal change might have to be,
> but it's great to see that there is other interest in moving in this
> direction (from the folks that matter :) ).
> 
> Having taken a stab at it myself, I have a comment/question on this bullet
> about StateStoreContext:
> 
> It does *not*  include anything processor- or record- specific, like
> > `forward()` or any information about the "current" record, which is only a
> > well-defined in the context of the Processor. Processors process one record
> > at a time, but state stores may be used to store and fetch many records, so
> > there is no "current record".
> > 
> 
> I totally agree that record-specific or processor-specific context in a
> state store is often not well-defined and it would be good to separate that
> out, but sometimes it (at least record-specific context) is actually
> useful, for example, passing the record's timestamp through to the
> underlying 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-09 Thread Paul Whalen
John,

It's exciting to see this KIP head in this direction!  In the last year or
so I've tried to sketch out some usability improvements for custom state
stores, and I also ended up splitting out the StateStoreContext from the
ProcessorContext in an attempt to facilitate what I was doing.  I sort of
abandoned it when I realized how large the ideal change might have to be,
but it's great to see that there is other interest in moving in this
direction (from the folks that matter :) ).

Having taken a stab at it myself, I have a comment/question on this bullet
about StateStoreContext:

It does *not*  include anything processor- or record- specific, like
> `forward()` or any information about the "current" record, which is only a
> well-defined in the context of the Processor. Processors process one record
> at a time, but state stores may be used to store and fetch many records, so
> there is no "current record".
>

I totally agree that record-specific or processor-specific context in a
state store is often not well-defined and it would be good to separate that
out, but sometimes it (at least record-specific context) is actually
useful, for example, passing the record's timestamp through to the
underlying storage (or changelog topic):
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121

You could have the writer client of the state store pass this through, but
it would be nice to be able to write state stores where the client did not
have this responsibility.  I'm not sure if the solution is to add some
things back to StateStoreContext, or make yet another context that
represents record-specific context while inside a state store.

Best,
Paul

On Wed, Sep 9, 2020 at 5:43 PM John Roesler  wrote:

> Hello all,
>
> I've been slowly pushing KIP-478 forward over the last year,
> and I'm happy to say that we're making good progress now.
> However, several issues with the original design have come
> to light.
>
> The major changes:
>
> We discovered that the original plan of just adding generic
> parameters to ProcessorContext was too disruptive, so we are
> now adding a new api.ProcessorContext.
>
> That choice forces us to add a new StateStore.init method
> for the new context, but ProcessorContext really isn't ideal
> for state stores to begin with, so I'm proposing a new
> StateStoreContext for this purpose. In a nutshell, there are
> quite a few methods in ProcessorContext that actually should
> never be called from inside a StateStore.
>
> Also, since there is a new ProcessorContext interface, we
> need a new MockProcessorContext implementation in the test-
> utils module.
>
>
>
> The changeset for the KIP document is here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=14=10
>
> And the KIP itself is here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
>
>
> If you have any concerns, please let me know!
>
> Thanks,
> -John
>
>


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-09 Thread John Roesler
Hello all,

I've been slowly pushing KIP-478 forward over the last year,
and I'm happy to say that we're making good progress now.
However, several issues with the original design have come
to light.

The major changes:

We discovered that the original plan of just adding generic
parameters to ProcessorContext was too disruptive, so we are
now adding a new api.ProcessorContext.

That choice forces us to add a new StateStore.init method
for the new context, but ProcessorContext really isn't ideal
for state stores to begin with, so I'm proposing a new
StateStoreContext for this purpose. In a nutshell, there are
quite a few methods in ProcessorContext that actually should
never be called from inside a StateStore.

Also, since there is a new ProcessorContext interface, we
need a new MockProcessorContext implementation in the test-
utils module.



The changeset for the KIP document is here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121=14=10

And the KIP itself is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API


If you have any concerns, please let me know!

Thanks,
-John



Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-30 Thread John Roesler
Thanks, everyone, for the really good discussion.

The vote has been open for 6 days, and has three binding votes (Guozhang,
Bill, Matthias), in addition to my own non-binding +1, so the KIP vote
passes!

Next, I'll close my POC PR and put together an actual change set for review.

Thanks again, all,
-John

On Mon, Jul 29, 2019 at 4:58 PM Matthias J. Sax 
wrote:

> +1 (binding)
>
> On 7/29/19 11:59 AM, Bill Bejeck wrote:
> > Thanks for the KIP.
> >
> > +1 (binding)
> >
> > -Bill
> >
> > On Wed, Jul 24, 2019 at 12:12 PM Guozhang Wang 
> wrote:
> >
> >> Yeah I think I agree with you.
> >>
> >> +1 (binding) from me.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, Jul 24, 2019 at 7:43 AM John Roesler  wrote:
> >>
> >>> Hi Guozhang,
> >>>
> >>> Thanks! I just replied in the discuss thread. I agree with what you're
> >>> proposing, but would like to consider it outside the scope of this KIP,
> >> if
> >>> that's ok with you.
> >>>
> >>> -John
> >>>
> >>> On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang 
> >> wrote:
> >>>
>  Hi John,
> 
>  I left another question regarding Transformer in the DISCUSS thread.
> >>> Other
>  than that I think this KIP is ready. Thanks!
> 
> 
>  Guozhang
> 
> 
>  On Tue, Jul 16, 2019 at 9:01 AM John Roesler 
> >> wrote:
> 
> > Hi Dev,
> >
> > After a good discussion, I'd like to start the vote for KIP-478
> > (https://cwiki.apache.org/confluence/x/2SkLBw).
> >
> > The proposal is to deprecate the existing interface
> > org.apache.kafka.streams.processor.Processor in favor of a
> > new one, org.apache.kafka.streams.processor.api.Processor > KOut, VOut> that parameterizes both the input and output types.
> >
> > This change enables both the Streams DSL internal code and external
> > Processor API code to improve their type safety and protect
> >> themselves
> > from type-level bugs.
> >
> > Thanks,
> > -John
> >
> 
> 
>  --
>  -- Guozhang
> 
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-29 Thread Matthias J. Sax
+1 (binding)

On 7/29/19 11:59 AM, Bill Bejeck wrote:
> Thanks for the KIP.
> 
> +1 (binding)
> 
> -Bill
> 
> On Wed, Jul 24, 2019 at 12:12 PM Guozhang Wang  wrote:
> 
>> Yeah I think I agree with you.
>>
>> +1 (binding) from me.
>>
>>
>> Guozhang
>>
>>
>> On Wed, Jul 24, 2019 at 7:43 AM John Roesler  wrote:
>>
>>> Hi Guozhang,
>>>
>>> Thanks! I just replied in the discuss thread. I agree with what you're
>>> proposing, but would like to consider it outside the scope of this KIP,
>> if
>>> that's ok with you.
>>>
>>> -John
>>>
>>> On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang 
>> wrote:
>>>
 Hi John,

 I left another question regarding Transformer in the DISCUSS thread.
>>> Other
 than that I think this KIP is ready. Thanks!


 Guozhang


 On Tue, Jul 16, 2019 at 9:01 AM John Roesler 
>> wrote:

> Hi Dev,
>
> After a good discussion, I'd like to start the vote for KIP-478
> (https://cwiki.apache.org/confluence/x/2SkLBw).
>
> The proposal is to deprecate the existing interface
> org.apache.kafka.streams.processor.Processor in favor of a
> new one, org.apache.kafka.streams.processor.api.Processor KOut, VOut> that parameterizes both the input and output types.
>
> This change enables both the Streams DSL internal code and external
> Processor API code to improve their type safety and protect
>> themselves
> from type-level bugs.
>
> Thanks,
> -John
>


 --
 -- Guozhang

>>>
>>
>>
>> --
>> -- Guozhang
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-29 Thread Bill Bejeck
Thanks for the KIP.

+1 (binding)

-Bill

On Wed, Jul 24, 2019 at 12:12 PM Guozhang Wang  wrote:

> Yeah I think I agree with you.
>
> +1 (binding) from me.
>
>
> Guozhang
>
>
> On Wed, Jul 24, 2019 at 7:43 AM John Roesler  wrote:
>
> > Hi Guozhang,
> >
> > Thanks! I just replied in the discuss thread. I agree with what you're
> > proposing, but would like to consider it outside the scope of this KIP,
> if
> > that's ok with you.
> >
> > -John
> >
> > On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang 
> wrote:
> >
> > > Hi John,
> > >
> > > I left another question regarding Transformer in the DISCUSS thread.
> > Other
> > > than that I think this KIP is ready. Thanks!
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jul 16, 2019 at 9:01 AM John Roesler 
> wrote:
> > >
> > > > Hi Dev,
> > > >
> > > > After a good discussion, I'd like to start the vote for KIP-478
> > > > (https://cwiki.apache.org/confluence/x/2SkLBw).
> > > >
> > > > The proposal is to deprecate the existing interface
> > > > org.apache.kafka.streams.processor.Processor in favor of a
> > > > new one, org.apache.kafka.streams.processor.api.Processor > > > KOut, VOut> that parameterizes both the input and output types.
> > > >
> > > > This change enables both the Streams DSL internal code and external
> > > > Processor API code to improve their type safety and protect
> themselves
> > > > from type-level bugs.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-24 Thread Guozhang Wang
Yeah I think I agree with you.

+1 (binding) from me.


Guozhang


On Wed, Jul 24, 2019 at 7:43 AM John Roesler  wrote:

> Hi Guozhang,
>
> Thanks! I just replied in the discuss thread. I agree with what you're
> proposing, but would like to consider it outside the scope of this KIP, if
> that's ok with you.
>
> -John
>
> On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang  wrote:
>
> > Hi John,
> >
> > I left another question regarding Transformer in the DISCUSS thread.
> Other
> > than that I think this KIP is ready. Thanks!
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jul 16, 2019 at 9:01 AM John Roesler  wrote:
> >
> > > Hi Dev,
> > >
> > > After a good discussion, I'd like to start the vote for KIP-478
> > > (https://cwiki.apache.org/confluence/x/2SkLBw).
> > >
> > > The proposal is to deprecate the existing interface
> > > org.apache.kafka.streams.processor.Processor in favor of a
> > > new one, org.apache.kafka.streams.processor.api.Processor > > KOut, VOut> that parameterizes both the input and output types.
> > >
> > > This change enables both the Streams DSL internal code and external
> > > Processor API code to improve their type safety and protect themselves
> > > from type-level bugs.
> > >
> > > Thanks,
> > > -John
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-24 Thread John Roesler
Hi Guozhang,

Thanks! I just replied in the discuss thread. I agree with what you're
proposing, but would like to consider it outside the scope of this KIP, if
that's ok with you.

-John

On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang  wrote:

> Hi John,
>
> I left another question regarding Transformer in the DISCUSS thread. Other
> than that I think this KIP is ready. Thanks!
>
>
> Guozhang
>
>
> On Tue, Jul 16, 2019 at 9:01 AM John Roesler  wrote:
>
> > Hi Dev,
> >
> > After a good discussion, I'd like to start the vote for KIP-478
> > (https://cwiki.apache.org/confluence/x/2SkLBw).
> >
> > The proposal is to deprecate the existing interface
> > org.apache.kafka.streams.processor.Processor in favor of a
> > new one, org.apache.kafka.streams.processor.api.Processor > KOut, VOut> that parameterizes both the input and output types.
> >
> > This change enables both the Streams DSL internal code and external
> > Processor API code to improve their type safety and protect themselves
> > from type-level bugs.
> >
> > Thanks,
> > -John
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-23 Thread Guozhang Wang
Hi John,

I left another question regarding Transformer in the DISCUSS thread. Other
than that I think this KIP is ready. Thanks!


Guozhang


On Tue, Jul 16, 2019 at 9:01 AM John Roesler  wrote:

> Hi Dev,
>
> After a good discussion, I'd like to start the vote for KIP-478
> (https://cwiki.apache.org/confluence/x/2SkLBw).
>
> The proposal is to deprecate the existing interface
> org.apache.kafka.streams.processor.Processor in favor of a
> new one, org.apache.kafka.streams.processor.api.Processor KOut, VOut> that parameterizes both the input and output types.
>
> This change enables both the Streams DSL internal code and external
> Processor API code to improve their type safety and protect themselves
> from type-level bugs.
>
> Thanks,
> -John
>


-- 
-- Guozhang