Re: Kafka Streams: Dynamic Topic Routing & Nonexistent Topics

2020-07-04 Thread Guozhang Wang
Hello,

Thanks for reaching out to the community for this. I think (maybe you've
also suggested) it is rather an observation on producer client than on
streams client. Generally speaking we want to know if we can fail fast if
the metadata cannot be found in producer.send() call. And here are my
thoughts:

1) caching the metadata outside producer, e.g. in an admin client would not
be a perfect solution since in either way your metadata cache inside the
producer or inside the admin client would not guarantee to be always up to
date: e.g. maybe you've decided to fail the record to send since it was not
in the cache, but one second right after it the metadata gets refreshed and
contains that topic.

2) letting the send() call to fail with an UnknownTopicOrPartitionError and
push the burden on the caller to decide what to do (either wait and retry,
or give up and stop the world etc) may work, but that requires modifying
the interface semantics, or at least adding an overloaded function of
"send()". Maybe worth discussing in a KIP.

3) for your specific case, if you believe the metadata should be static and
not changed (i.e. you assume all topics should be pre-created and none
would be created later), then I think setting max.block to a smaller value
and just catch TimeoutException is fine since for send() itself, the
max.block is only used for metadata refresh and buffer allocation when it
is not sufficient, and the latter should be rare case assuming you set the
buffer.size to be reasonably large. But note that since max.block is a
global config it may also affect other blocking calls like txn-related ones
as well.


On Wed, Jul 1, 2020 at 6:10 PM Rhys Anthony McCaig  wrote:

> Hi All,
>
> I have been recently working on a streams application that uses a
> TopicNameExtractor to dynamically route records based on the payload. This
> streams application is used by various other applications, and occasionally
> these other applications request for a record to be sent to a non-existent
> topic - rather than this topic be created, the message should be logged and
> dropped.
>
> Unfortunately, I don't seem to have found a good way to implement this
> behaviour in a reliable way: I originally hoped to be able to catch these
> scenarios in a ProductionExceptionHandler by catching an
> UnknownTopicOrPartitionError, however the current producer behaviour is to
> wait for max.block.ms in waitOnMetadata() for partitions to be returned
> for
> the topic before throwing a TimeoutException. If after refreshing metadata,
> there are still no partitions for the requested topic, it will continue to
> request an update until the timeout is reached:  (
>
> https://github.com/apache/kafka/blob/b8a99be7847c61d7792689b71fda5b283f8340a8/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1051
> )
> For my use case, there are two challenges here:
> 1. ProductionExceptionHandler must catch TimeoutException and inspect the
> message to determine that the exception was caused by not finding the topic
> in the metadata
> 2. The streams task blocks (as expected) while the producer is fetching
> metadata, holding up processing of other records, until the timeout
> exception is thrown.
>
> Rather than accept the stream blocking in this scenario, my current
> thinking is to use AdminClient to keep a cache of existing/nonexisting
> topics periodically updated and filter based on this - however i can't stop
> thinking that this feels clunky, given the producer maintains its own cache
> of recently accessed topics/partitions.
>
> Would it make sense to enhance KafkaProducer to:
> - Optionally fail fast when the first metadata refresh does not return the
> requested topic, or partition count? (And maybe even optionally cache
> this?)
> - Differentiate between a TimeoutException and
> UnknownTopicOrPartitionError?
>
> My understanding of the internals isn't great - I'm not clear on the reason
> to continue to request metadata updates after getting a new version - is
> there a possible issue with getting stale metadata from brokers?
>
> Looking forward to your thoughts!
>


-- 
-- Guozhang


[GitHub] [kafka-site] huxihx commented on pull request #272: KAFKA-10222:Incorrect methods show up in 0.10 Kafka Streams docs

2020-07-04 Thread GitBox


huxihx commented on pull request #272:
URL: https://github.com/apache/kafka-site/pull/272#issuecomment-653833022


   @mjsax Thanks for reminding. Will close this PR and open a new PR to track 
this issue.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka-site] huxihx closed pull request #272: KAFKA-10222:Incorrect methods show up in 0.10 Kafka Streams docs

2020-07-04 Thread GitBox


huxihx closed pull request #272:
URL: https://github.com/apache/kafka-site/pull/272


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[DISCUSS] KIP-639 Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to StreamsMetrics

2020-07-04 Thread Mohamed Chebbi



https://cwiki.apache.org/confluence/display/KAFKA/KIP-639%3A+Move+nodeLevelSensor+and+storeLevelSensor+methods+from+StreamsMetricsImpl+to+StreamsMetrics



[GitHub] [kafka-site] mjsax commented on pull request #272: KAFKA-10222:Incorrect methods show up in 0.10 Kafka Streams docs

2020-07-04 Thread GitBox


mjsax commented on pull request #272:
URL: https://github.com/apache/kafka-site/pull/272#issuecomment-653828716


   Thanks for the PR @huxihx -- note, that this pages are generated from the 
JavaDocs. Thus, if we fix it, we should fix it in the source code?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




Build failed in Jenkins: kafka-trunk-jdk11 #1619

2020-07-04 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Document that max.block.ms affects some transaction methods


--
[...truncated 3.19 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 

Build failed in Jenkins: kafka-trunk-jdk8 #4693

2020-07-04 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Document that max.block.ms affects some transaction methods


--
[...truncated 3.17 MB...]

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithCaching STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithCaching PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithPersistentStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithPersistentStore PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STA

Build failed in Jenkins: kafka-trunk-jdk14 #268

2020-07-04 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Document that max.block.ms affects some transaction methods


--
[...truncated 3.19 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWr

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-07-04 Thread John Roesler
Thanks Sagar,

That looks good to me! The only minor comment I’d make is that I think the 
method declaration should have a default implementation that throws an 
UnsupportedOperationException, for source compatibility with existing state 
stores.

Otherwise, as far as I’m concerned, I’m ready to vote. 

Thanks,
John

On Sat, Jul 4, 2020, at 12:19, Sagar wrote:
> Hi John,
> 
> I have updated the KIP with all the new changes we discussed in this
> discussion thread.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> 
> Request you to go through the same.
> 
> Thanks!
> Sagar.
> 
> On Tue, Jun 30, 2020 at 8:09 AM John Roesler  wrote:
> 
> > Hi Sagar,
> >
> > That’s a good observation; yes, it should go in the ReadOnlyKeyValueStore
> > interface.
> >
> > Thanks again for the great work,
> > John
> >
> > On Sun, Jun 28, 2020, at 23:54, Sagar wrote:
> > > Hi John,
> > >
> > > Thank you for the positive feedback! The meaningful discussions we had on
> > > the mailing list helped me understand what needed to be done.
> > >
> > > I am definitely open to any further suggestions on this.
> > >
> > > Before I updated the KIP, I just had one question, is it fine to have it
> > > for KeyValueStore or should I move it to ReadOnlyKeyValueStore where even
> > > the range query resides?
> > >
> > > Regarding the 2 notes on UnsupportedOperationException and changing the
> > > name to prefixScan, i will incorporate both of them into the KIP.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Sun, Jun 28, 2020 at 11:55 PM John Roesler 
> > wrote:
> > >
> > > > Woah, this is great, Sagar!
> > > >
> > > > I think this API looks really good. I'm curious if anyone else has
> > > > any concern. For my part, I think this will work just fine. People
> > > > might face tricky bugs getting their key serde and their prefix
> > > > serde "aligned", but I think the API makes it pretty obvious what
> > > > has to happen to make this work. As long as the API isn't going
> > > > to "trick" anyone by trying to abstract away things that can't be
> > > > abstracted, this is the best we can do. In other words, I think
> > > > your approach is ideal here.
> > > >
> > > > I also really appreciate that you took the time to do a full POC
> > > > with end-to-end tests to show that the proposal is actually
> > > > going to work.
> > > >
> > > > A couple of notes as you update the KIP:
> > > >
> > > > 1. I think that for "optional" state store features like this, we
> > > > should add a default implementation to the interface that
> > > > throws UnsupportedOperationException. That way,
> > > > any existing store implementations won't fail to compile
> > > > on the new version. And any store that just can't support
> > > > a prefix scan would simply not override the method.
> > > >
> > > > 2. I think you meant "prefixScan", not "prefixSeek", since
> > > > we're actually getting an iterator that only returns prefix-
> > > > matching keys, as opposed to just seeking to that prefix.
> > > >
> > > > Thanks again for the work you've put into this. I look
> > > > forward to reviewing the updated KIP.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > >
> > > > On Sun, Jun 28, 2020, at 12:17, Sagar wrote:
> > > > > Hi John,
> > > > >
> > > > > I took some time out and as we discussed, looked to implement these
> > > > > changes. Most of these changes are for demonstrative purposes but I
> > > > thought
> > > > > I will share.
> > > > >
> > > > > I added the new prefixSeek method at the KeyValueStore interface
> > level:
> > > > >
> > > > >
> > > >
> > https://github.com/confluentinc/kafka/pull/242/files#diff-5e92747b506c868db3948323478e1b07R74-R83
> > > > >
> > > > > As you had pointed out, the prefix type can be different from the key
> > > > type.
> > > > > That's why this method takes 2 parameters. the key type and it's
> > > > serializer.
> > > > >
> > > > > Then I added the implementation of this method in a couple of Stores.
> > > > > RocksDBStore:
> > > > >
> > > > >
> > > >
> > https://github.com/confluentinc/kafka/pull/242/commits#diff-046ca566243518c88e007b7499ec9f51R308-R320
> > > > > and
> > > > > InMemoryKVStore:
> > > > >
> > > > >
> > > >
> > https://github.com/confluentinc/kafka/pull/242/commits#diff-4c685a32e765eab60bcb60097768104eR108-R120
> > > > >
> > > > > I modified the older test cases for RocksDBStore. You can find them
> > here:
> > > > >
> > > > >
> > > >
> > https://github.com/confluentinc/kafka/pull/242/commits#diff-051439f56f0d6a12334d7e8cc4f66bf8R304-R415
> > > > >
> > > > >
> > > > > I have added a test case where the keys are of type UUID while the
> > prefix
> > > > > is of type string. This seems to be working because the code is able
> > to
> > > > > pull in UUIDs with the provided prefix, even though their types are
> > > > > different.
> > > > >
> > > > > To address one of the gaps from my previous implementation, I have
> > also
> > > > > added a test case for the en

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-07-04 Thread Sagar
Hi John,

I have updated the KIP with all the new changes we discussed in this
discussion thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores

Request you to go through the same.

Thanks!
Sagar.

On Tue, Jun 30, 2020 at 8:09 AM John Roesler  wrote:

> Hi Sagar,
>
> That’s a good observation; yes, it should go in the ReadOnlyKeyValueStore
> interface.
>
> Thanks again for the great work,
> John
>
> On Sun, Jun 28, 2020, at 23:54, Sagar wrote:
> > Hi John,
> >
> > Thank you for the positive feedback! The meaningful discussions we had on
> > the mailing list helped me understand what needed to be done.
> >
> > I am definitely open to any further suggestions on this.
> >
> > Before I updated the KIP, I just had one question, is it fine to have it
> > for KeyValueStore or should I move it to ReadOnlyKeyValueStore where even
> > the range query resides?
> >
> > Regarding the 2 notes on UnsupportedOperationException and changing the
> > name to prefixScan, i will incorporate both of them into the KIP.
> >
> > Thanks!
> > Sagar.
> >
> > On Sun, Jun 28, 2020 at 11:55 PM John Roesler 
> wrote:
> >
> > > Woah, this is great, Sagar!
> > >
> > > I think this API looks really good. I'm curious if anyone else has
> > > any concern. For my part, I think this will work just fine. People
> > > might face tricky bugs getting their key serde and their prefix
> > > serde "aligned", but I think the API makes it pretty obvious what
> > > has to happen to make this work. As long as the API isn't going
> > > to "trick" anyone by trying to abstract away things that can't be
> > > abstracted, this is the best we can do. In other words, I think
> > > your approach is ideal here.
> > >
> > > I also really appreciate that you took the time to do a full POC
> > > with end-to-end tests to show that the proposal is actually
> > > going to work.
> > >
> > > A couple of notes as you update the KIP:
> > >
> > > 1. I think that for "optional" state store features like this, we
> > > should add a default implementation to the interface that
> > > throws UnsupportedOperationException. That way,
> > > any existing store implementations won't fail to compile
> > > on the new version. And any store that just can't support
> > > a prefix scan would simply not override the method.
> > >
> > > 2. I think you meant "prefixScan", not "prefixSeek", since
> > > we're actually getting an iterator that only returns prefix-
> > > matching keys, as opposed to just seeking to that prefix.
> > >
> > > Thanks again for the work you've put into this. I look
> > > forward to reviewing the updated KIP.
> > >
> > > Thanks,
> > > -John
> > >
> > >
> > > On Sun, Jun 28, 2020, at 12:17, Sagar wrote:
> > > > Hi John,
> > > >
> > > > I took some time out and as we discussed, looked to implement these
> > > > changes. Most of these changes are for demonstrative purposes but I
> > > thought
> > > > I will share.
> > > >
> > > > I added the new prefixSeek method at the KeyValueStore interface
> level:
> > > >
> > > >
> > >
> https://github.com/confluentinc/kafka/pull/242/files#diff-5e92747b506c868db3948323478e1b07R74-R83
> > > >
> > > > As you had pointed out, the prefix type can be different from the key
> > > type.
> > > > That's why this method takes 2 parameters. the key type and it's
> > > serializer.
> > > >
> > > > Then I added the implementation of this method in a couple of Stores.
> > > > RocksDBStore:
> > > >
> > > >
> > >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-046ca566243518c88e007b7499ec9f51R308-R320
> > > > and
> > > > InMemoryKVStore:
> > > >
> > > >
> > >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-4c685a32e765eab60bcb60097768104eR108-R120
> > > >
> > > > I modified the older test cases for RocksDBStore. You can find them
> here:
> > > >
> > > >
> > >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-051439f56f0d6a12334d7e8cc4f66bf8R304-R415
> > > >
> > > >
> > > > I have added a test case where the keys are of type UUID while the
> prefix
> > > > is of type string. This seems to be working because the code is able
> to
> > > > pull in UUIDs with the provided prefix, even though their types are
> > > > different.
> > > >
> > > > To address one of the gaps from my previous implementation, I have
> also
> > > > added a test case for the end to end flow using the state store
> supplier.
> > > > you can find it here:
> > > >
> > > >
> > >
> https://github.com/confluentinc/kafka/pull/242/commits#diff-a94de5b2ec72d09ebac7183c31d7c906R269-R305
> > > >
> > > > Note that for this to work, i needed to update MeteredKVstore and
> > > > ChangeLoggingKVStore.
> > > >
> > > > Lastly, barring the 4 stores mentioned above, rest of the
> implementers of
> > > > KVStore have the prefixSeek override as null. As I said, this is
> mainly
> > > for
> > > > demonstrative purposes and hence done this way.
> > > > If you get the chance, it would be great if you can provid

Re: [VOTE] KIP-617: Allow Kafka Streams State Stores to be iterated backwards

2020-07-04 Thread John Roesler
Thanks Jorge,

I’m +1 (binding)

-John

On Fri, Jul 3, 2020, at 10:26, Jorge Esteban Quilcate Otoya wrote:
> Hola everyone,
> 
> I'd like to start a new thread to vote for KIP-617 as there have been
> significant changes since the previous vote started.
> 
> KIP wiki page:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-617%3A+Allow+Kafka+Streams+State+Stores+to+be+iterated+backwards
> 
> Many thanks!
> 
> Jorge.
>


Re: [VOTE] KIP-418: A method-chaining way to branch KStream

2020-07-04 Thread John Roesler
Hi Ivan,

Congratulations! It looks like you have 3 binding and 2 non-binding votes, so 
you can announce this KIP as accepted and follow up with a PR. 

Thanks,
John

On Mon, Jun 29, 2020, at 20:46, Bill Bejeck wrote:
> Thanks for the KIP Ivan, +1 (binding).
> 
> -Bill
> 
> On Mon, Jun 29, 2020 at 7:22 PM Guozhang Wang  wrote:
> 
> > +1 (binding). Thanks Ivan!
> >
> >
> > Guozhang
> >
> > On Mon, Jun 29, 2020 at 3:55 AM Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> > > This will be a great addition. Thanks Ivan!
> > >
> > > +1 (non-binding)
> > >
> > > On Fri, Jun 26, 2020 at 7:07 PM John Roesler 
> > wrote:
> > >
> > > > Thanks, Ivan!
> > > >
> > > > I’m +1 (binding)
> > > >
> > > > -John
> > > >
> > > > On Thu, May 28, 2020, at 17:24, Ivan Ponomarev wrote:
> > > > > Hello all!
> > > > >
> > > > > I'd like to start the vote for KIP-418 which proposes deprecation of
> > > > > current `branch` method and provides a method-chaining based API for
> > > > > branching.
> > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> > > > >
> > > > > Regards,
> > > > >
> > > > > Ivan
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

2020-07-04 Thread John Roesler
Hi Richard,

It’s good to hear from you!

Thanks for bringing up the wall-clock suppression feature. IIRC, someone 
actually started a KIP discussion for it already, but I don’t think it went to 
a vote. I don’t recall any technical impediment, just the lack of availability 
to finish it up. Although there is some association, it would be good to keep 
the KIPs separate.

Thanks,
John

On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:
> Hi all,
> 
> This reminds me of a previous issue I think that we were discussing.
> @John Roesler  I think you should remember this 
> one.
> 
> A while back, we were talking about having suppress operator emit 
> records by wall-clock time instead of stream time.
> If we are adding this, wouldn't that make it more feasible for us to 
> implement that feature for suppression?
> 
> If I recall correctly, there actually had been quite a bit of user 
> demand for such a feature.
> Might be good to include it in this KIP (or maybe use one of the prior 
> KIPs for this feature).
> 
> Best,
> Richard
> 
> On Sat, Jul 4, 2020 at 6:58 AM John Roesler  wrote:
> > Hi all,
> > 
> >  1. Thanks, Boyang, it is nice to see usage examples in KIPs like this. It 
> > helps during the discussion, and it’s also good documentation later on. 
> > 
> >  2. Yeah, this is a subtle point. The motivation mentions being able to 
> > control the time during tests, but to be able to make it work, the 
> > processor implementation needs a public method on ProcessorContext to get 
> > the time. Otherwise, processors would have to check the type of the context 
> > and cast, depending on whether they’re running inside a test or not. In 
> > retrospect, if we’d had a usage example, this probably would have been 
> > clear. 
> > 
> >  3. I don’t think we expect people to have their own implementations of 
> > ProcessorContext. Since all implementations are internal, it’s really an 
> > implementation detail whether we use a default method, abstract methods, or 
> > concrete methods. I can’t think of an implementation that really wants to 
> > just look up the system time. In the production code path, we cache the 
> > time for performance, and in testing, we use a mock time. 
> > 
> >  Thanks,
> >  John
> > 
> > 
> >  On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
> >  > 1. Makes sense; let me propose something
> >  > 
> >  > 2. That's not testing-only. The goal is to use the same API to access 
> >  > the time
> >  > in deployment and testing environments. The major driver is 
> >  > System.currentTimeMillis(),
> >  > which a) cannot be used in tests b) could go in specific cases back c) 
> >  > is not compatible
> >  > with punctuator call. The idea is that we could access clock using 
> >  > uniform API. 
> >  > For completness we should have same API for system and stream time.
> >  > 
> >  > 3. There aren't that many subclasses. Two important ones are 
> >  > ProcessorContextImpl and 
> >  > MockProcessorContext (and third one: 
> >  > ForwardingDisableProcessorContext). If given
> >  > implementation does not support schedule() call, there is no reason to 
> >  > support clock access. 
> >  > The default implementation should just throw 
> >  > UnsupportedOperationException just to prevent
> >  > from compilation errors in possible subclasses.
> >  > 
> >  > On 2020/07/01 02:24:43, Boyang Chen  wrote: 
> >  > > Thanks Will for the KIP. A couple questions and suggestions:
> >  > > 
> >  > > 1. I think for new APIs to make most sense, we should add a minimal 
> > example
> >  > > demonstrating how it could be useful to structure unit tests w/o the 
> > new
> >  > > APIs.
> >  > > 2. If this is a testing-only feature, could we only add it
> >  > > to MockProcessorContext?
> >  > > 3. Regarding the API, since this will be added to the ProcessorContext 
> > with
> >  > > many subclasses, does it make sense to provide default implementations 
> > as
> >  > > well?
> >  > > 
> >  > > Boyang
> >  > > 
> >  > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell 
> >  > > wrote:
> >  > > 
> >  > > > Thanks, John! I made the change. How much longer should I let there 
> > be
> >  > > > discussion before starting a VOTE?
> >  > > >
> >  > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler  
> > wrote:
> >  > > >
> >  > > > > Thanks, Will,
> >  > > > >
> >  > > > > That looks good to me. I would only add "cached" or something
> >  > > > > to indicate that it wouldn't just transparently look up the current
> >  > > > > System.currentTimeMillis every time.
> >  > > > >
> >  > > > > For example:
> >  > > > > /**
> >  > > > > * Returns current cached wall-clock system timestamp in 
> > milliseconds.
> >  > > > > *
> >  > > > > * @return the current cached wall-clock system timestamp in 
> > milliseconds
> >  > > > > */
> >  > > > > long currentSystemTimeMs();
> >  > > > >
> >  > > > > I don't want to give specific information about _when_ exactly the
> >  > > > > timestamp cache will be up

Re: [DISCUSS] KIP-616: Rename implicit Serdes instances in kafka-streams-scala

2020-07-04 Thread John Roesler
Hi Yuriy,

I agree, we can keep them separate. I just wanted to make you aware of it.

Thanks for the PR, it looks the way I expected. 

I just read over the KIP document again. I think it needs to be updated to the 
current proposal, and then we’ll be able to start the vote. 

Thanks,
John

On Tue, Jun 30, 2020, at 04:58, Yuriy Badalyantc wrote:
> Hi everybody!
> 
> Looks like a discussion about KIP-513 could take a while. I think we should
> move forward with KIP-616 without waiting for KIP-513.
> 
> I created a new pull request for KIP-616:
> https://github.com/apache/kafka/pull/8955. It contains a new
> `org.apache.kafka.streams.scala.serialization.Serdes` object without name
> clash. An old one was marked as deprecated. This change is backward
> compatible and it could be merged in any further release.
> 
> On Wed, Jun 3, 2020 at 12:41 PM Yuriy Badalyantc  wrote:
> 
> > Hi, John
> >
> > Thanks for pointing that out. I expressed my thoughts about KIP-513 and
> > its connection to KIP-616 in the KIP-513 mail list.
> >
> > - Yuriy
> >
> > On Sun, May 31, 2020 at 1:26 AM John Roesler  wrote:
> >
> >> Hi Yuriy,
> >>
> >> I was just looking back at KIP-513, and I’m wondering if there’s any
> >> overlap we should consider here, or if they are just orthogonal.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Thu, May 28, 2020, at 21:36, Yuriy Badalyantc wrote:
> >> > At the current moment, I think John's plan is better than the original
> >> plan
> >> > described in the KIP. I think we should create a new `Serdes` in another
> >> > package. The old one will be deprecated.
> >> >
> >> > - Yuriy
> >> >
> >> > On Fri, May 29, 2020 at 8:58 AM John Roesler 
> >> wrote:
> >> >
> >> > > Thanks, Matthias,
> >> > >
> >> > > If we go with the approach Yuriy and I agreed on, to deprecate and
> >> replace
> >> > > the whole class and not just a few of the methods, then the timeline
> >> is
> >> > > less of a concern. Under that plan, Yuriy can just write the new class
> >> > > exactly the way he wants and people can cleanly swap over to the new
> >> > > pattern when they are ready.
> >> > >
> >> > > The timeline was more significant if we were just going to deprecate
> >> some
> >> > > methods and add new methods to the existing class. That plan requires
> >> two
> >> > > implementation phases, where we first deprecate the existing methods
> >> and
> >> > > later swap the implicits at the same time we remove the deprecated
> >> members.
> >> > > Aside from the complexity of that approach, it’s not a breakage free
> >> path,
> >> > > as some users would be forced to continue using the deprecated members
> >> > > until a future release drops them, breaking their source code, and
> >> only
> >> > > then can they update their code.
> >> > >
> >> > > That wouldn’t be the end of the world, and we’ve had to do the same
> >> thing
> >> > > in the past with the implicit conversations, but this is a much wider
> >> > > scope, since it’s all the serdes. I’m happy with the new plan, since
> >> it’s
> >> > > not only one step, but also it provides everyone a breakage-free path.
> >> > >
> >> > > We can still consider dropping the deprecated class in 3.0; I just
> >> wanted
> >> > > to clarify how the timeline issue has changed.
> >> > >
> >> > > Thanks,
> >> > > John
> >> > >
> >> > > On Thu, May 28, 2020, at 20:34, Matthias J. Sax wrote:
> >> > > > I am not a Scale person, so I cannot really contribute much.
> >> However for
> >> > > > the deprecation period, if we get the change into 2.7, it might be
> >> ok to
> >> > > > remove the deprecated classed in 3.0.
> >> > > >
> >> > > > It would only be one minor release in between what is a little bit
> >> short
> >> > > > (we usually prefer at least two minor released, better three), but
> >> if we
> >> > > > have a good reason for it, it might be ok.
> >> > > >
> >> > > > If we cannot remove it in 3.0, it seems there would be a 4.0 in
> >> about a
> >> > > > year(?) when ZK removal is finished and we can remove the deprecated
> >> > > > code than.
> >> > > >
> >> > > >
> >> > > > -Matthias
> >> > > >
> >> > > > On 5/28/20 7:39 AM, John Roesler wrote:
> >> > > > > Hi Yuriy,
> >> > > > >
> >> > > > > Sounds good to me! I had a feeling we were bringing different
> >> context
> >> > > > > to the discussion; thanks for sticking with the conversation
> >> until we
> >> > > got
> >> > > > > it hashed out.
> >> > > > >
> >> > > > > I'm glad you prefer Serde*s*, since having multiple different
> >> classes
> >> > > with
> >> > > > > the same name leads to all kinds of trouble. "Serdes" seems
> >> relatively
> >> > > > > safe because people in the Scala lib won't be using the Java
> >> Serdes
> >> > > class,
> >> > > > > and they won't be using the deprecated and non-deprecated one at
> >> the
> >> > > > > same time.
> >> > > > >
> >> > > > > Thank again,
> >> > > > > -John
> >> > > > >
> >> > > > > On Thu, May 28, 2020, at 02:21, Yuriy Badalyantc wrote:
> >> > > > >> Ok, I understood you, John. I was

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

2020-07-04 Thread Richard Yu
Hi all,

This reminds me of a previous issue I think that we were discussing.
@John Roesler  I think you should remember this one.

A while back, we were talking about having suppress operator emit records
by wall-clock time instead of stream time.
If we are adding this, wouldn't that make it more feasible for us to
implement that feature for suppression?

If I recall correctly, there actually had been quite a bit of user demand
for such a feature.
Might be good to include it in this KIP (or maybe use one of the prior KIPs
for this feature).

Best,
Richard

On Sat, Jul 4, 2020 at 6:58 AM John Roesler  wrote:

> Hi all,
>
> 1. Thanks, Boyang, it is nice to see usage examples in KIPs like this. It
> helps during the discussion, and it’s also good documentation later on.
>
> 2. Yeah, this is a subtle point. The motivation mentions being able to
> control the  time during tests, but to be able to make it work, the
> processor implementation needs a public method on ProcessorContext to get
> the time. Otherwise, processors would have to check the type of the context
> and cast, depending on whether they’re running inside a test or not. In
> retrospect, if we’d had a usage example, this probably would have been
> clear.
>
> 3. I don’t think we expect people to have their own implementations of
> ProcessorContext. Since all implementations are internal, it’s really an
> implementation detail whether we use a default method, abstract methods, or
> concrete methods. I can’t think of an implementation that really wants to
> just look up the system time. In the production code path, we cache the
> time for performance, and in testing, we use a mock time.
>
> Thanks,
> John
>
>
> On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
> > 1. Makes sense; let me propose something
> >
> > 2. That's not testing-only. The goal is to use the same API to access
> > the time
> > in deployment and testing environments. The major driver is
> > System.currentTimeMillis(),
> > which a) cannot be used in tests b) could go in specific cases back c)
> > is not compatible
> > with punctuator call. The idea is that we could access clock using
> > uniform API.
> > For completness we should have same API for system and stream time.
> >
> > 3. There aren't that many subclasses. Two important ones are
> > ProcessorContextImpl and
> > MockProcessorContext (and third one:
> > ForwardingDisableProcessorContext). If given
> > implementation does not support schedule() call, there is no reason to
> > support clock access.
> > The default implementation should just throw
> > UnsupportedOperationException just to prevent
> > from compilation errors in possible subclasses.
> >
> > On 2020/07/01 02:24:43, Boyang Chen  wrote:
> > > Thanks Will for the KIP. A couple questions and suggestions:
> > >
> > > 1. I think for new APIs to make most sense, we should add a minimal
> example
> > > demonstrating how it could be useful to structure unit tests w/o the
> new
> > > APIs.
> > > 2. If this is a testing-only feature, could we only add it
> > > to MockProcessorContext?
> > > 3. Regarding the API, since this will be added to the ProcessorContext
> with
> > > many subclasses, does it make sense to provide default implementations
> as
> > > well?
> > >
> > > Boyang
> > >
> > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell 
> > > wrote:
> > >
> > > > Thanks, John! I made the change. How much longer should I let there
> be
> > > > discussion before starting a VOTE?
> > > >
> > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler 
> wrote:
> > > >
> > > > > Thanks, Will,
> > > > >
> > > > > That looks good to me. I would only add "cached" or something
> > > > > to indicate that it wouldn't just transparently look up the current
> > > > > System.currentTimeMillis every time.
> > > > >
> > > > > For example:
> > > > > /**
> > > > >  * Returns current cached wall-clock system timestamp in
> milliseconds.
> > > > >  *
> > > > >  * @return the current cached wall-clock system timestamp in
> milliseconds
> > > > >  */
> > > > > long currentSystemTimeMs();
> > > > >
> > > > > I don't want to give specific information about _when_ exactly the
> > > > > timestamp cache will be updated, so that we can adjust it in the
> > > > > future, but it does seem important to make people aware that they
> > > > > won't see the timestamp advance during the execution of
> > > > > Processor.process(), for example.
> > > > >
> > > > > With that modification, I'll be +1 on this proposal.
> > > > >
> > > > > Thanks again for the KIP!
> > > > > -John
> > > > >
> > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> > > > > > Thanks, John! I appreciate you adjusting my lingo. I made the
> change to
> > > > > the
> > > > > > KIP. I will add the note about system time to the javadoc.
> > > > > >
> > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <
> vvcep...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Will,
> > > > > > >
> > > > > > > This proposal looks good

Re: [DISCUSS] KIP-617: Allow Kafka Streams State Stores to be iterated backwards

2020-07-04 Thread John Roesler
Thanks Jorge,

This KIP looks good to me!

-John

On Fri, Jul 3, 2020, at 03:19, Jorge Esteban Quilcate Otoya wrote:
> Hi John,
> 
> Thanks for the feedback.
> 
> I'd be happy to take the third option and consider moving methods to
> ReadOnlySessionStore as part of the KIP.
> Docs is updated to reflect these changes.
> 
> Cheers,
> Jorge.
> 
> On Thu, Jul 2, 2020 at 10:06 PM John Roesler  wrote:
> 
> > Hey Jorge,
> >
> > Thanks for the details. That sounds like a mistake to me on both counts.
> >
> > I don’t think you need to worry about those depreciations. If the
> > interface methods aren’t deprecated, then the methods are not deprecated.
> > We should remove the annotations, but it doesn’t need to be in the kip.
> >
> > I think any query methods should have been in the ReadOnly interface. I
> > guess it’s up to you whether you want to:
> > 1. Add the reverse methods next to the existing methods (what you have in
> > the kip right now)
> > 2. Partially fix it by adding your new methods to the ReadOnly interface
> > 3. Fully fix the problem by moving the existing methods as well as your
> > new ones. Since  SessionStore extends ReadOnlySessionStore, it’s ok just to
> > move the definitions.
> >
> > I’m ok with whatever you prefer.
> >
> > Thanks,
> > John
> >
> > On Thu, Jul 2, 2020, at 11:29, Jorge Esteban Quilcate Otoya wrote:
> > > (sorry for the spam)
> > >
> > > Actually `findSessions` are only deprecated on `InMemorySessionStore`,
> > > which seems strange as RocksDB and interfaces haven't marked these
> > methods
> > > as deprecated.
> > >
> > > Any hint on how to handle this?
> > >
> > > Cheers,
> > >
> > > On Thu, Jul 2, 2020 at 4:57 PM Jorge Esteban Quilcate Otoya <
> > > quilcate.jo...@gmail.com> wrote:
> > >
> > > > @John: I can see there are some deprecations in there as well. Will
> > update
> > > > the KIP.
> > > >
> > > > Thanks,
> > > > Jorge
> > > >
> > > >
> > > > On Thu, Jul 2, 2020 at 3:29 PM Jorge Esteban Quilcate Otoya <
> > > > quilcate.jo...@gmail.com> wrote:
> > > >
> > > >> Thanks John.
> > > >>
> > > >> > it looks like there’s a revision error in which two methods are
> > > >> proposed for SessionStore, but seem like they should be in
> > > >> ReadOnlySessionStore. Do I read that right?
> > > >>
> > > >> Yes, I've opted to keep the new methods alongside the existing ones.
> > In
> > > >> the case of SessionStore, `findSessions` are in `SessionStore`, and
> > `fetch`
> > > >> are in `ReadOnlySessionStore`. If it makes more sense, I can move all
> > of
> > > >> them to ReadOnlySessionStore.
> > > >> Let me know what you think.
> > > >>
> > > >> Thanks,
> > > >> Jorge.
> > > >>
> > > >> On Thu, Jul 2, 2020 at 2:36 PM John Roesler 
> > wrote:
> > > >>
> > > >>> Hi Jorge,
> > > >>>
> > > >>> Thanks for the update. I think this is a good plan.
> > > >>>
> > > >>> I just took a look at the KIP again, and it looks like there’s a
> > > >>> revision error in which two methods are proposed for SessionStore,
> > but seem
> > > >>> like they should be in ReadOnlySessionStore. Do I read that right?
> > > >>>
> > > >>> Otherwise, I’m happy with your proposal.
> > > >>>
> > > >>> Thanks,
> > > >>> John
> > > >>>
> > > >>> On Wed, Jul 1, 2020, at 17:01, Jorge Esteban Quilcate Otoya wrote:
> > > >>> > Quick update: KIP is updated with latest changes now.
> > > >>> > Will leave it open this week while working on the PR.
> > > >>> >
> > > >>> > Hope to open a new vote thread over the next few days if no
> > additional
> > > >>> > feedback is provided.
> > > >>> >
> > > >>> > Cheers,
> > > >>> > Jorge.
> > > >>> >
> > > >>> > On Mon, Jun 29, 2020 at 11:30 AM Jorge Esteban Quilcate Otoya <
> > > >>> > quilcate.jo...@gmail.com> wrote:
> > > >>> >
> > > >>> > > Thanks, John!
> > > >>> > >
> > > >>> > > Make sense to reconsider the current approach. I was heading in a
> > > >>> similar
> > > >>> > > direction while drafting the implementation. Metered, Caching,
> > and
> > > >>> other
> > > >>> > > layers will also have to get duplicated to build up new methods
> > in
> > > >>> `Stores`
> > > >>> > > factory, and class casting issues would appear on stores created
> > by
> > > >>> DSL.
> > > >>> > >
> > > >>> > > I will draft a proposal with new methods (move methods from
> > proposed
> > > >>> > > interfaces to existing ones) with default implementation in a KIP
> > > >>> update
> > > >>> > > and wait for Matthias to chime in to validate this approach.
> > > >>> > >
> > > >>> > > Jorge.
> > > >>> > >
> > > >>> > >
> > > >>> > > On Sat, Jun 27, 2020 at 4:01 PM John Roesler <
> > vvcep...@apache.org>
> > > >>> wrote:
> > > >>> > >
> > > >>> > >> Hi Jorge,
> > > >>> > >>
> > > >>> > >> Sorry for my silence, I've been absorbed with the 2.6 and 2.5.1
> > > >>> releases.
> > > >>> > >>
> > > >>> > >> The idea to separate the new methods into "mixin" interfaces
> > seems
> > > >>> > >> like a good one, but as we've discovered in KIP-614, it doesn't
> > work
> > > >>> > >> out that way in practice. The probl

Re: [DISCUSS] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

2020-07-04 Thread John Roesler
Hi all,

1. Thanks, Boyang, it is nice to see usage examples in KIPs like this. It helps 
during the discussion, and it’s also good documentation later on. 

2. Yeah, this is a subtle point. The motivation mentions being able to control 
the  time during tests, but to be able to make it work, the processor 
implementation needs a public method on ProcessorContext to get the time. 
Otherwise, processors would have to check the type of the context and cast, 
depending on whether they’re running inside a test or not. In retrospect, if 
we’d had a usage example, this probably would have been clear. 

3. I don’t think we expect people to have their own implementations of 
ProcessorContext. Since all implementations are internal, it’s really an 
implementation detail whether we use a default method, abstract methods, or 
concrete methods. I can’t think of an implementation that really wants to just 
look up the system time. In the production code path, we cache the time for 
performance, and in testing, we use a mock time. 

Thanks,
John


On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:
> 1. Makes sense; let me propose something
> 
> 2. That's not testing-only. The goal is to use the same API to access 
> the time
> in deployment and testing environments. The major driver is 
> System.currentTimeMillis(),
> which a) cannot be used in tests b) could go in specific cases back c) 
> is not compatible
> with punctuator call. The idea is that we could access clock using 
> uniform API. 
> For completness we should have same API for system and stream time.
> 
> 3. There aren't that many subclasses. Two important ones are 
> ProcessorContextImpl and 
> MockProcessorContext (and third one: 
> ForwardingDisableProcessorContext). If given
> implementation does not support schedule() call, there is no reason to 
> support clock access. 
> The default implementation should just throw 
> UnsupportedOperationException just to prevent
> from compilation errors in possible subclasses.
> 
> On 2020/07/01 02:24:43, Boyang Chen  wrote: 
> > Thanks Will for the KIP. A couple questions and suggestions:
> > 
> > 1. I think for new APIs to make most sense, we should add a minimal example
> > demonstrating how it could be useful to structure unit tests w/o the new
> > APIs.
> > 2. If this is a testing-only feature, could we only add it
> > to MockProcessorContext?
> > 3. Regarding the API, since this will be added to the ProcessorContext with
> > many subclasses, does it make sense to provide default implementations as
> > well?
> > 
> > Boyang
> > 
> > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell 
> > wrote:
> > 
> > > Thanks, John! I made the change. How much longer should I let there be
> > > discussion before starting a VOTE?
> > >
> > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler  wrote:
> > >
> > > > Thanks, Will,
> > > >
> > > > That looks good to me. I would only add "cached" or something
> > > > to indicate that it wouldn't just transparently look up the current
> > > > System.currentTimeMillis every time.
> > > >
> > > > For example:
> > > > /**
> > > >  * Returns current cached wall-clock system timestamp in milliseconds.
> > > >  *
> > > >  * @return the current cached wall-clock system timestamp in 
> > > > milliseconds
> > > >  */
> > > > long currentSystemTimeMs();
> > > >
> > > > I don't want to give specific information about _when_ exactly the
> > > > timestamp cache will be updated, so that we can adjust it in the
> > > > future, but it does seem important to make people aware that they
> > > > won't see the timestamp advance during the execution of
> > > > Processor.process(), for example.
> > > >
> > > > With that modification, I'll be +1 on this proposal.
> > > >
> > > > Thanks again for the KIP!
> > > > -John
> > > >
> > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:
> > > > > Thanks, John! I appreciate you adjusting my lingo. I made the change 
> > > > > to
> > > > the
> > > > > KIP. I will add the note about system time to the javadoc.
> > > > >
> > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler 
> > > > wrote:
> > > > >
> > > > > > Hi Will,
> > > > > >
> > > > > > This proposal looks good to me overall. Thanks for the contribution!
> > > > > >
> > > > > > Just a couple of minor notes:
> > > > > >
> > > > > > The system time method would return a cached timestamp that Streams
> > > > looks
> > > > > > up once when it starts processing a record. This may be confusing, 
> > > > > > so
> > > > it
> > > > > > might be good to state it in the javadoc.
> > > > > >
> > > > > > I thought the javadoc for the stream time might be a bit confusing.
> > > We
> > > > > > normally talk about “Tasks” not “partition groups” in the public 
> > > > > > api.
> > > > Maybe
> > > > > > just saying that it’s “the maximum timestamp of any record yet
> > > > processed by
> > > > > > the task” would be both high level and accurate.
> > > > > >
> > > > > > Thanks again!
> > > > > > -John
> > > > > >
> > > > >