[jira] [Created] (KAFKA-10478) advertised.listeners should allow duplicated ports

2020-09-11 Thread Andre Araujo (Jira)
Andre Araujo created KAFKA-10478:


 Summary: advertised.listeners should allow duplicated ports
 Key: KAFKA-10478
 URL: https://issues.apache.org/jira/browse/KAFKA-10478
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Andre Araujo


The same 
[validations|https://github.com/apache/kafka/blob/391ad90112fb2e9a85bf76250d57863bbf33b383/core/src/main/scala/kafka/utils/CoreUtils.scala#L259-L260]
 performed for {{listeners}} endpoints are also applied to 
[{{advertised.listeners}}|https://github.com/apache/kafka/blob/e8b2dcdee6f25e9344d52b84e86328ec616bf819/core/src/main/scala/kafka/server/KafkaConfig.scala#L1689-L1691].

It makes sense that neither parameter should allow duplicated listener names. 
The port number restriction is different though.

It makes sense that we only allow one listener per port, since two listeners 
cannot bind to the same port at the same time (considering a single network 
interface).

For advertised listeners, though this doesn't apply since Kafka doesn't 
actually bind to the advertised listener ports. A practical application of 
relaxing this restriction for {{advertised.listeners}} is the following:

When configuring Kafka using Kerberos authentication and a Load Balancer we 
need to have two SASL_SSL listeners: (A) one running with the 
{{kafka/hostname}} principal and (B) another using {{kafka/lb_name}}, which is 
necessary for proper authentication when using the LB FQDN. After bootstrap, 
though, the client receives the brokers' addresses with the actual host FQDNs 
advertised by the brokers. To connect to the brokerd using the hostnames the 
client must connect to the listener A to be able to authenticate successfully 
with Kerberos.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: How do I place a JIRA in status "Patch Available"? (For KAFKA-10473)

2020-09-11 Thread John Roesler
Thanks, James!

That change looks good to me.

-John

On Fri, 2020-09-11 at 15:54 -0700, James Cheng wrote:
> Thanks John! I can access and edit the wiki now.
> 
> And I improved the instructions for how to change the status of a JIRA to 
> "Patch Available". 
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=59689925=36=37
>  
> 
> 
> Thanks for the help,
> -James
> 
> > On Sep 11, 2020, at 12:53 PM, John Roesler  wrote:
> > 
> > Hi James,
> > 
> > Sorry, I overlooked your reply until now. I've granted you
> > access.
> > 
> > Thanks,
> > -John
> > 
> > On Wed, 2020-09-09 at 21:44 -0700, James Cheng wrote:
> > > Thanks John. My wiki user ID is wushujames 
> > > 
> > > -James
> > > 
> > > Sent from my iPhone
> > > 
> > > > On Sep 9, 2020, at 7:03 PM, John Roesler  wrote:
> > > > 
> > > > Hi James,
> > > > 
> > > > Good, I’m glad my incredibly vague response was helpful!
> > > > 
> > > > If you let me know your wiki user id, I can grant you edit permission. 
> > > > It’s a separate account from Jira. 
> > > > 
> > > > Thanks,
> > > > John
> > > > 
> > > > > On Wed, Sep 9, 2020, at 20:45, James Cheng wrote:
> > > > > Thanks John. That worked.
> > > > > 
> > > > > I clicked the button that says "Submit Patch", and a dialog box 
> > > > > popped 
> > > > > up. I didn't fill out anything additional in the dialog, and clicked 
> > > > > "Submit Patch" in the dialog.
> > > > > 
> > > > > The JIRA is now in status "Patch Available"
> > > > > 
> > > > > I would like to improve the docs at 
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes
> > > > >  
> > > > > 
> > > > >  to make this step clearer. It looks like I don't have permissions to 
> > > > > edit the page.
> > > > > 
> > > > > Can someone grant me permissions to edit the page? 
> > > > > 
> > > > > Or, if that is too difficult, can someone edit the page as follows?
> > > > > 
> > > > > Change
> > > > > 
> > > > >   7. Change the status of the JIRA to "Patch Available" if it's ready 
> > > > > for review.
> > > > > to be
> > > > > 
> > > > >   7. Change the status of the JIRA to "Patch Available" if it's ready 
> > > > > for review. Do this by clicking the "Submit Patch" button in JIRA, 
> > > > > and 
> > > > > then in the resulting dialog, click "Submit Patch".
> > > > > 
> > > > > -James
> > > > > 
> > > > > > > On Sep 9, 2020, at 6:24 PM, John Roesler  
> > > > > > > wrote:
> > > > > > 
> > > > > > Hi James,
> > > > > > 
> > > > > > I think the button on Jira says “Add Patch” or something confusing 
> > > > > > like that. 
> > > > > > 
> > > > > > Thanks,
> > > > > > John
> > > > > > 
> > > > > > 
> > > > > > On Wed, Sep 9, 2020, at 17:34, James Cheng wrote:
> > > > > > > I have a JIRA that I am working on, and a pull request available 
> > > > > > > for it.
> > > > > > > 
> > > > > > > [KAFKA-10473] Website is missing docs on JMX metrics for 
> > > > > > > partition 
> > > > > > > size-on-disk (kafka.log:type=Log,name=*)
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-10473
> > > > > > > https://github.com/apache/kafka/pull/9276
> > > > > > > 
> > > > > > > The "Contributing Code Changes" instructions say to
> > > > > > >   7. Change the status of the JIRA to "Patch Available" if it's 
> > > > > > > ready for review.
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes
> > > > > > > 
> > > > > > > How do I do that? 
> > > > > > > * The title of my pull request starts with KAFKA-10473, so the 
> > > > > > > JIRA 
> > > > > > > does have a link to the pull request
> > > > > > > * I *was* able to assign it to myself and then say "Start 
> > > > > > > progress" and 
> > > > > > > now the status says "In Progress".
> > > > > > > * But I can't find how to set it to "Patch Available". In the 
> > > > > > > JIRA 
> > > > > > > website, I can't find a field or menu item that lets me change 
> > > > > > > the 
> > > > > > > status to "Patch Available" . 
> > > > > > > 
> > > > > > > Thanks,
> > > > > > > -James
> > > > > > > 
> > > > > > > 
> > > > > > > 



Build failed in Jenkins: Kafka » kafka-trunk-jdk8 #56

2020-09-11 Thread Apache Jenkins Server
See 


Changes:

[github] Adding reverse iterator usage for sliding windows processing 
(extending KIP-450) (#9239)


--
[...truncated 6.52 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 

Build failed in Jenkins: Kafka » kafka-trunk-jdk15 #58

2020-09-11 Thread Apache Jenkins Server
See 


Changes:

[github] Adding reverse iterator usage for sliding windows processing 
(extending KIP-450) (#9239)


--
[...truncated 6.57 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 

Build failed in Jenkins: Kafka » kafka-trunk-jdk11 #58

2020-09-11 Thread Apache Jenkins Server
See 


Changes:

[github] Adding reverse iterator usage for sliding windows processing 
(extending KIP-450) (#9239)


--
[...truncated 3.29 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher PASSED

org.apache.kafka.streams.test.TestRecordTest > testFields STARTED

org.apache.kafka.streams.test.TestRecordTest > testFields PASSED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode STARTED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithCaching STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithCaching PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithPersistentStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithPersistentStore PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 

Re: How do I place a JIRA in status "Patch Available"? (For KAFKA-10473)

2020-09-11 Thread James Cheng
Thanks John! I can access and edit the wiki now.

And I improved the instructions for how to change the status of a JIRA to 
"Patch Available". 
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=59689925=36=37
 


Thanks for the help,
-James

> On Sep 11, 2020, at 12:53 PM, John Roesler  wrote:
> 
> Hi James,
> 
> Sorry, I overlooked your reply until now. I've granted you
> access.
> 
> Thanks,
> -John
> 
> On Wed, 2020-09-09 at 21:44 -0700, James Cheng wrote:
>> Thanks John. My wiki user ID is wushujames 
>> 
>> -James
>> 
>> Sent from my iPhone
>> 
>>> On Sep 9, 2020, at 7:03 PM, John Roesler  wrote:
>>> 
>>> Hi James,
>>> 
>>> Good, I’m glad my incredibly vague response was helpful!
>>> 
>>> If you let me know your wiki user id, I can grant you edit permission. It’s 
>>> a separate account from Jira. 
>>> 
>>> Thanks,
>>> John
>>> 
 On Wed, Sep 9, 2020, at 20:45, James Cheng wrote:
 Thanks John. That worked.
 
 I clicked the button that says "Submit Patch", and a dialog box popped 
 up. I didn't fill out anything additional in the dialog, and clicked 
 "Submit Patch" in the dialog.
 
 The JIRA is now in status "Patch Available"
 
 I would like to improve the docs at 
 https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes
  
 
  to make this step clearer. It looks like I don't have permissions to edit 
 the page.
 
 Can someone grant me permissions to edit the page? 
 
 Or, if that is too difficult, can someone edit the page as follows?
 
 Change
 
   7. Change the status of the JIRA to "Patch Available" if it's ready for 
 review.
 to be
 
   7. Change the status of the JIRA to "Patch Available" if it's ready 
 for review. Do this by clicking the "Submit Patch" button in JIRA, and 
 then in the resulting dialog, click "Submit Patch".
 
 -James
 
>> On Sep 9, 2020, at 6:24 PM, John Roesler  wrote:
> 
> Hi James,
> 
> I think the button on Jira says “Add Patch” or something confusing like 
> that. 
> 
> Thanks,
> John
> 
> 
> On Wed, Sep 9, 2020, at 17:34, James Cheng wrote:
>> I have a JIRA that I am working on, and a pull request available for it.
>> 
>> [KAFKA-10473] Website is missing docs on JMX metrics for partition 
>> size-on-disk (kafka.log:type=Log,name=*)
>> https://issues.apache.org/jira/browse/KAFKA-10473
>> https://github.com/apache/kafka/pull/9276
>> 
>> The "Contributing Code Changes" instructions say to
>>   7. Change the status of the JIRA to "Patch Available" if it's ready 
>> for review.
>> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes
>> 
>> How do I do that? 
>> * The title of my pull request starts with KAFKA-10473, so the JIRA 
>> does have a link to the pull request
>> * I *was* able to assign it to myself and then say "Start progress" and 
>> now the status says "In Progress".
>> * But I can't find how to set it to "Patch Available". In the JIRA 
>> website, I can't find a field or menu item that lets me change the 
>> status to "Patch Available" . 
>> 
>> Thanks,
>> -James
>> 
>> 
>> 
> 



Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-11 Thread Sophie Blee-Goldman
Hey Walker,

The proposal makes sense to me, but while reading up on those old
tickets I started wondering if we should give users two options: one
that would shut down the entire application, as described in the current
KIP, and another that would only shut down an individual instance.

I think there are a number of reasons that shutting down only the
erroring instance and not the entire application would be useful, for
example if the local state is corrupted. Currently a user can achieve
this, but the mechanism is pretty awkward and has caused issues in
the past: calling close inside the uncaught exception handler. The uncaught
exception handler is called from the stream thread, and close(timeout) is
supposed to wait for all stream threads to shutdown. So you end up waiting
for the full timeout to pass and even once it does, you know for a fact that
not all threads have completed shutdown. Not to mention, if you happen
to call the close() overload with no timeout parameter, your application
will be stuck blocking forever.

Giving users an exception to throw that means "shutdown the instance"
would make this common practice significantly smoother. It shouldn't
expand the scope of this KIP too much since we'll have a mechanism to
shut down the instance already in place.  WDYT?

On that note, it might be a good idea to choose a more explicit name for
the exception in the current proposal, eg `ShutdownApplicationException`,
so it's obvious what exactly will shut down. Then we could have the second
exception be named `ShutdownInstanceException` or so on

Sophie


On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax  wrote:

> Thanks for the KIP.
>
> It seem that the new exception would need to be thrown by user code?
> However, in the motivation you mention the scenario of a missing source
> topic that a user cannot detect, but KafkaStreams runtime would be
> responsible to handle.
>
> How do both things go together?
>
>
> -Matthias
>
> On 9/11/20 10:31 AM, Walker Carlson wrote:
> > Hello all,
> >
> > I have created KIP-671 to give the option to shutdown a streams
> > application in response to an error.
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> >
> > This is because of the Jira ticket
> > 
> >
> > Please give it a look and let me know if you have any feedback.
> >
> > Thanks,
> > Walker
> >
>
>


Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

2020-09-11 Thread Matthias J. Sax
Jorge,

thanks a lot for this KIP. Being able to modify headers is a very
valuable feature.

However, before we actually expose them in the DSL, I am wondering if we
should improve how headers can be modified in the PAPI? Currently, it is
possible but very clumsy to work with headers in the Processor API,
because of two reasons:

 (1) There is no default `Headers` implementation in the public API
 (2) There is no explicit way to set headers for output records

Currently, the input record headers are copied into the output records
when `forward()` is called, however, it's not really a deep copy but we
just copy the reference. This implies that one needs to work with a
single mutable object that flows through multiple processors making it
very error prone.

Furthermore, if you want to emit multiple output records, and for
example want to add two different headers to the output record (based on
the same input headers), you would need to do something like this:

  Headers h = context.headers();
  h.add(...);
  context.forward(...);
  // remove the header you added for the first output record
  h.remove(...);
  h.add(...);
  context.forward(...);


Maybe we could extend `To` to allow passing in a new `Headers` object
(or an `Iterable` similar to `ProducerRecord`)? We could either
add it to your KIP or do a new KIP just for the PAPI.

Thoughts?


-Matthias

On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> Hi everyone,
> 
> Bumping this thread to check if there's any feedback.
> 
> Cheers,
> Jorge.
> 
> On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
> 
>> Hi everyone,
>>
>> I would like to start the discussion for 
>> KIP-634:https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
>>
>> Looking forward to your feedback.
>>
>> Thanks!
>> Jorge.
>>
>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Resolved] (KAFKA-7970) Missing topic causes service shutdown without exception

2020-09-11 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-7970.

Fix Version/s: 2.7.0
 Assignee: Bruno Cadonna
   Resolution: Fixed

> Missing topic causes service shutdown without exception
> ---
>
> Key: KAFKA-7970
> URL: https://issues.apache.org/jira/browse/KAFKA-7970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jonny Heavey
>Assignee: Bruno Cadonna
>Priority: Minor
> Fix For: 2.7.0
>
>
> When launching a KafkaStreams application that depends on a topic that 
> doesn't exist, the streams application correctly logs an error such as:
> " is unknown yet during rebalance, please make sure they have 
> been pre-created before starting the Streams application."
> The stream is then shutdown, however, no exception is thrown indicating that 
> an error has occurred.
> In our circumstances, we run our streams app inside a container. The streams 
> service is shutdown, but the process is not exited, meaning that the 
> container does not crash (reducing visibility of the issue).
> As no exception is thrown in the missing topic scenario described above, our 
> application code has no way to determine that something is wrong that would 
> then allow it to terminate the process.
>  
> Could the onPartitionsAssigned method in StreamThread.java throw an exception 
> when it decides to shutdown the stream (somewhere around line 264)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: KIP-658 Deprecate all setters of Headers

2020-09-11 Thread Matthias J. Sax
Well, I am not totally sure if immutability is actually desired for
`Headers`?

Consider the case when there is an application that just wants to add
for example a tracing header field. It would just call `add()` on the
consumer record's headers and pass the modified `Headers` object to the
`ProducerRecord` constructor without the need to deep-copy (to be fair,
there is no real deep copy as we would only copy some references).

Or an application which multiple components and one component creates a
`ProducerRecord`, some other component may or may not add some
additional headers, and a third component does the actually send?

I am wondering if this KIP might be a case of premature optimization? Is
there an actual issue you have observed with mutable headers? -- It
might also be worth to go back to the original headers KIP to see if
there was a discussion about immutability and what the reasoning for the
current design was?


-Matthias

On 8/18/20 7:42 PM, Chia-Ping Tsai wrote:
>> If you create a new `ProducerRecord` you would not pass in a `Headers`
>> object but a `Iterable`?
>>
>> And for `ConsumerRecord` you just get a `Headers` object back and don't
>> need to worry about the concrete implementation?
> 
> You are right. I'd give a bad case :(
> 
> For another reason, could we avoid using a mutable object to be the response 
> type? ConsumerRecord and ProducerRecord have a good chance to be immutable if 
> we can remove setters from Headers.  
> 
> On 2020/08/18 20:42:59, "Matthias J. Sax"  wrote: 
>> Thanks for the KIP. However, I am not sure for what use-case you would
>> need to implement the `Headers` interface?
>>
>> If you create a new `ProducerRecord` you would not pass in a `Headers`
>> object but a `Iterable`?
>>
>> And for `ConsumerRecord` you just get a `Headers` object back and don't
>> need to worry about the concrete implementation?
>>
>>
>> -Matthias
>>
>>
>> On 8/18/20 3:39 AM, Chia-Ping Tsai wrote:
>>> Hi everyone,
>>>
>>> I would like to start a discussion on KIP-658: 
>>> https://cwiki.apache.org/confluence/x/-DJ4CQ
>>>
>>> KIP-658 plans to deprecate all setters of Headers and provide default 
>>> implementation. The benefit of this KIP are shown below.
>>>
>>> 1. Users don't need to implement those setters which are not used by kafka.
>>> 2. open a room to refactor Headers in next major (for example, a pojo 
>>> Headers)
>>>
>>> feedback is welcome!!!
>>>
>>> 
>>> chia-ping
>>>
>>
>>



signature.asc
Description: OpenPGP digital signature


Re: KIP-669: Preserve Source Partition in Kafka Streams from context

2020-09-11 Thread Matthias J. Sax
With regard to KIP-478, there is the idea to introduce a `RecordContext`
class.

Thus, we could just change the `StreamPartitioner` to take this new
class as parameter to `partition()`? This might actually kill two birds
with one stone, because I could imagine use cases in which users want to
partition based on header information that is currently not exposed either.

For this case, we don't even need to provide any default implementation
of `StreamPartitioner` but users can just implement it themselves. The
use case itself makes sense, but it does not seem to be generic enough
that we need to provide an out-of-the-box implementation for it.


-Matthias

On 9/10/20 3:59 PM, Sophie Blee-Goldman wrote:
> Hey Balan, thanks for the KIP!
> 
> The motivation here makes sense to me, but I have a few questions about the
> proposed API
> 
> I guess the main thing to point out is that if we just add new addSink()
> overloads to Topology,
> then only the lower level Processor API will benefit and users of the DSL
> won't be able to utilize
> this. This seems like a useful feature that we should make available to
> anyone.
> 
> We could follow a similar approach and add new toStream overloads to the
> KStream class, but
> that would expand the surface area of the API pretty significantly. The
> additional addSink()
> overloads alone would do this. The addSink() methods already have a pretty
> large number
> of optional parameters which means more and more overloads every time a new
> one is added.
> We should avoid making this problem worse wherever possible.
> 
> Existing StreamPartitioner  in SinkNode will be made null when context
>> partition is enabled
> 
> 
> This line from your KIP gave me some idea that it might be avoidable in
> this case. The implication
> of this quote is that the StreamPartitioner and useContextPartition
> parameter are inherently
> incompatible since they are two ways of specifying the same thing, the
> target partition. Well, if
> that's the case, then we should be able to just leverage the existing
> StreamPartitioner in some
> way to specify that we want records to end up in the source partition,
> without introducing a new
> parameter.
> 
> One option would be to just let users pass in a null StreamPartitioner to
> mean that it should
> use the source partition, but that seems a bit subtle. Maybe a better API
> would be to offer
> a new out-of-the-box StreamPartitioner called SourceContextPartitioner (or
> something),
> and then users just have to pass in an instance of this. WDYT?
> 
> On Thu, Sep 10, 2020 at 8:00 AM Balan k  wrote:
> 
>>
>> Forgot to add the link
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-669%3A+Preserve+Source+Partition+in+Kafka+Streams+from+context
>>
>>
>>
>> On 2020/09/10 13:40:02, satyanarayan komandur 
>> wrote:
>>> Hi,
>>>
>>> I have submitted a new KIP for preserving processor record context
>> partition from source. I am looking for suggestions/comments.
>>>
>>> In most use cases where source message is getting transformed and sent
>> to a target topic, where
>>> 1. number of partitions on source and sink topic are same
>>> 2. there is no change to the key
>>> 3. more importantly if we would like to preserve the partition as is
>> without re-deriving using partition from context would help.
>>>
>>> I am aware of one caveat where record processor context partition is not
>> known in stream punctuation.
>>>
>>> Please look over the KIP and chime in more ideas
>>>
>>> Thanks
>>> Balan
>>>
>>>
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (KAFKA-10477) Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record

2020-09-11 Thread Shaik Zakir Hussain (Jira)
Shaik Zakir Hussain created KAFKA-10477:
---

 Summary: Sink Connector fails with DataException when trying to 
convert Kafka record with empty key to Connect Record
 Key: KAFKA-10477
 URL: https://issues.apache.org/jira/browse/KAFKA-10477
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Shaik Zakir Hussain


Sink connector is facing a DataException when trying to convert a kafka record 
with empty key to Connect data format. 

Kafka's trunk branch currently depends on *jackson v2.10.5* 

A short unit test (shared below) in 
`org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue.  
{code:java}
@Test
public void testToConnectDataEmptyKey() throws IOException {
Map props = Collections.singletonMap("schemas.enable", 
false);
converter.configure(props, true);
String str = "";
SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", 
str.getBytes());
System.out.println(schemaAndValue);
}
{code}
This test code snippet fails with the following exception:
{noformat}
org.apache.kafka.connect.errors.DataException: Unknown schema type: null

at 
org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764)
at 
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385)
at 
org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
{noformat}
 

This seems related to the issue 
[https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson lib 
started returning `MissingNode` for empty input in 
`ObjectMapper.readTree(input)` method invocation. Precise code change can be 
observed here: 
[https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094]
 

 

This causes an exception to throw up in our JsonConverter class : 
[https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764]
 

 

In my opinion, when the `jsonValue.getNodeType()` is `MISSING` 
([https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L754]
 ), we need to fall back to the behaviour of the case `NULL` (i.e. return 
null), although not sure of any further repercussions this might bring in.

 

Things were working fine when the dependency on *jackson* lib was of version  
*v2.9.10.3* or lesser as the `ObjectMapper` returned null in that case.

 

Thanks,

Zakir



--
This message was sent by 

Re: How do I place a JIRA in status "Patch Available"? (For KAFKA-10473)

2020-09-11 Thread John Roesler
Hi James,

Sorry, I overlooked your reply until now. I've granted you
access.

Thanks,
-John

On Wed, 2020-09-09 at 21:44 -0700, James Cheng wrote:
> Thanks John. My wiki user ID is wushujames 
> 
> -James
> 
> Sent from my iPhone
> 
> > On Sep 9, 2020, at 7:03 PM, John Roesler  wrote:
> > 
> > Hi James,
> > 
> > Good, I’m glad my incredibly vague response was helpful!
> > 
> > If you let me know your wiki user id, I can grant you edit permission. It’s 
> > a separate account from Jira. 
> > 
> > Thanks,
> > John
> > 
> > > On Wed, Sep 9, 2020, at 20:45, James Cheng wrote:
> > > Thanks John. That worked.
> > > 
> > > I clicked the button that says "Submit Patch", and a dialog box popped 
> > > up. I didn't fill out anything additional in the dialog, and clicked 
> > > "Submit Patch" in the dialog.
> > > 
> > > The JIRA is now in status "Patch Available"
> > > 
> > > I would like to improve the docs at 
> > > https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes
> > >  
> > > 
> > >  to make this step clearer. It looks like I don't have permissions to 
> > > edit the page.
> > > 
> > > Can someone grant me permissions to edit the page? 
> > > 
> > > Or, if that is too difficult, can someone edit the page as follows?
> > > 
> > > Change
> > > 
> > >7. Change the status of the JIRA to "Patch Available" if it's ready 
> > > for review.
> > > to be
> > > 
> > >7. Change the status of the JIRA to "Patch Available" if it's ready 
> > > for review. Do this by clicking the "Submit Patch" button in JIRA, and 
> > > then in the resulting dialog, click "Submit Patch".
> > > 
> > > -James
> > > 
> > > > > On Sep 9, 2020, at 6:24 PM, John Roesler  wrote:
> > > > 
> > > > Hi James,
> > > > 
> > > > I think the button on Jira says “Add Patch” or something confusing like 
> > > > that. 
> > > > 
> > > > Thanks,
> > > > John
> > > > 
> > > > 
> > > > On Wed, Sep 9, 2020, at 17:34, James Cheng wrote:
> > > > > I have a JIRA that I am working on, and a pull request available for 
> > > > > it.
> > > > > 
> > > > > [KAFKA-10473] Website is missing docs on JMX metrics for partition 
> > > > > size-on-disk (kafka.log:type=Log,name=*)
> > > > > https://issues.apache.org/jira/browse/KAFKA-10473
> > > > > https://github.com/apache/kafka/pull/9276
> > > > > 
> > > > > The "Contributing Code Changes" instructions say to
> > > > >7. Change the status of the JIRA to "Patch Available" if it's 
> > > > > ready for review.
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes
> > > > > 
> > > > > How do I do that? 
> > > > > * The title of my pull request starts with KAFKA-10473, so the JIRA 
> > > > > does have a link to the pull request
> > > > > * I *was* able to assign it to myself and then say "Start progress" 
> > > > > and 
> > > > > now the status says "In Progress".
> > > > > * But I can't find how to set it to "Patch Available". In the JIRA 
> > > > > website, I can't find a field or menu item that lets me change the 
> > > > > status to "Patch Available" . 
> > > > > 
> > > > > Thanks,
> > > > > -James
> > > > > 
> > > > > 
> > > > > 



Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-11 Thread Matthias J. Sax
I think separating the different contexts make sense.

In fact, we could even go one step further and remove the record context
from the processor context completely and we add a third parameter to
`process(key, value, recordContext)`. This would make it clear that the
context is for the input record only and it's not possible to pass it to
a `punctuate` callback.

For the stores and changelogging: I think there are two cases. (1) You
use a plain key-value store. For this case, it seems you do not care
about the timestamp and thus does not care what timestamp is set in the
changelog records. (We can set anything we want, as it's not relevant at
all -- the timestamp is ignored on read anyway.) (2) The other case is,
that one does care about timestamps, and for this case should use
TimestampedKeyValueStore. The passed timestamp will be set on the
changelog records for this case.

Thus, for both cases, accessing the record context does not seems to be
a requirement. And providing access to the processor context to, eg.,
`forward()` or similar seems safe.


-Matthias

On 9/10/20 7:25 PM, John Roesler wrote:
> Thanks for the reply, Paul!
> 
> I certainly intend to make sure that the changelogging layer
> continues to work the way it does now, by hook or by crook.
> I think the easiest path for me is to just "cheat" and get
> the real ProcessorContext into the ChangeLoggingStore
> implementation somehow. I'll tag you on the PR when I create
> it, so you have an opportunity to express a preference about
> the implementation choice, and maybe even compile/test
> against it to make sure your stuff still works.
> 
> Regarding this:
> 
>> we have an interest in making a state store with a richer
>> way of querying its data (like perhaps getting all values
>> associated with a secondary key), while still ultimately
>> writing to the changelog topic for later restoration.
> 
> This is very intriguing to me. On the side, I've been
> preparing a couple of ideas related to this topic. I don't
> think I have a coherent enough thought to even express it in
> a Jira right now, but when I do, I'll tag you on it also to
> see what you think.
> 
> Whenever you're ready to share the usability improvement
> ideas, I'm very interested to see what you've come up with.
> 
> Thanks,
> -John
> 
> On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
>>> when you use a HashMap or RocksDB or other "state stores", you don't
>>> expect them to automatically know extra stuff about the record you're
>>> storing.
>>
>> So, I don't think there is any reason we *can't* retain the record context
>>> in the StateStoreContext, and if any users came along with a clear use case
>>> I'd find that convincing.
>>>
>>
>> I agree with the principle of being conservative with the StateStoreContext
>> API.  Regarding user expectations or a clear use case, the only
>> counterpoint I would offer is that we sort of have that use case already,
>> which is the example I gave of the change logging store using the
>> timestamp.  I am curious if this functionality will be retained when using
>> built in state stores, or will a low-level processor get a KeyValueStore
>> that no longer writes to the changelog topic with the record's timestamp.
>> While I personally don't care much about that functionality specifically, I
>> have a general desire for custom state stores to easily do the things that
>> built in state stores do.
>>
>> It genuinely did not occur to me that users might be looking up and/or
>>> updating records of other keys from within a Processor.
>>>
>>
>> I'm glad you said this Sophie, because it gives me an opportunity to say
>> that this is actually a *huge* use case for my team.  The state store
>> usability improvements I was referring to in my previous message were about
>> enabling the user to write custom stores while still easily hooking into
>> the ability to write to a changelog topic.  I think that is technically
>> possible now, but I don't think it's trivial.  Specifically, we have an
>> interest in making a state store with a richer way of querying its data
>> (like perhaps getting all values associated with a secondary key), while
>> still ultimately writing to the changelog topic for later restoration.
>>
>> We recognize that this use case throws away some of what kafka streams
>> (especially the DSL) is good at - easy parallelizability by partitioning
>> all processing by key - and that our business logic would completely fall
>> apart if we were consuming from multi-partition topics with multiple
>> consumers.  But we have found that using the low level processor API is
>> good for the very simple stream processing primitives it provides: handling
>> the plumbing of consuming from multiple kafka topics and potentially
>> updating persistent local state in a reliable way.  That in itself has
>> proven to be a worthwhile programming model.
>>
>> Since I got off track a bit, let me summarize: I don't particularly care
>> about the 

Re: [DISCUSS] KIP-508: Make Suppression State Queriable - rebooted.

2020-09-11 Thread Matthias J. Sax
Thanks for updating the KIP.

I think there is still one open question. `suppress()` can be used on
non-windowed KTable for rate control, as well as on a windowed-KTable
(also for rate control, but actually mainly) for only emitting the final
result of a windowed aggregation. For the non-windowed case, we use a
KeyValueStore while for the windowed cases, we use a WindowStore.

Because there is no final result for non-windowed KTables, it seems that
this new feature only make sense for the windowed-aggregation case?
Thus, the signature of `Materialized` should take a `WindowStore`
instead of a `KeyValueStore`?

If that's correct, I am wondering:

 - Can we guard from a miss-usage of the API if the upstream KTable is
not windowed (or maybe it's not necessary to guard)?
 - Can we actually implement it? We had issues with regard to KIP-300 to
materialize windowed-KTables?

Would be worth to clarify upfront. Maybe, we even need a POC
implementation to verify that it works?


-Matthias


On 9/11/20 12:26 AM, Dongjin Lee wrote:
> Hi All,
>
> Here is the voting thread:
>
https://lists.apache.org/thread.html/r5653bf2dafbb27b247bf20dbe6f070c151b3823d96c9c9ca94183e20%40%3Cdev.kafka.apache.org%3E
>
> Thanks,
> Dongjin
>
> On Fri, Sep 11, 2020 at 4:23 PM Dongjin Lee  wrote:
>
>> Hi John,
>>
>> Thanks for the feedback. I will open the Vote thread now.
>>
>> Best,
>> Dongjin
>>
>> On Fri, Sep 11, 2020 at 2:00 AM John Roesler  wrote:
>>
>>> Hi Dongjin,
>>>
>>> Sorry for the delay. I'm glad you're still pushing this
>>> forward. It would be nice to get this in to the 2.7 release.
>>>
>>> I just took another look at the KIP, and it looks good to
>>> me!
>>>
>>> I think this is ready for a vote.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Wed, 2020-08-05 at 22:04 +0900, Dongjin Lee wrote:
 Hi All,

 I updated the KIP
 <
>>>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable

 and the implementation, following the discussion here.

 You must be working hard preparing the release of 2.6.0, so please have
>>> a
 look after your work is done.

 Thanks,
 Dongjin

 On Sun, Mar 8, 2020 at 12:20 PM John Roesler 
>>> wrote:

> Thanks Matthias,
>
> Good idea. I've changed the ticket name and added a note
> clarifying that this ticket is not the same as
> https://issues.apache.org/jira/browse/KAFKA-7224
>
> Incidentally, I learned that I never documented my reasons
> for abandoning my work on KAFKA-7224 ! I've now updated
> that ticket, too, so your question had an unexpected side-benefit.
>
> Thanks,
> -John
>
> On Sat, Mar 7, 2020, at 18:01, Matthias J. Sax wrote:
> Thanks for clarification.
> 
> Can you maybe update the Jira ticket? Do we have a ticket for
> spill-to-disk? Maybe link to it and explain that it's two different
> things? Maybe even rename the ticket to something more clear, ie,
> "make suppress result queryable" or simliar?
> 
> 
> -Matthias
> 
> On 3/7/20 1:58 PM, John Roesler wrote:
 Hey Matthias,

 I’m sorry if the ticket was poorly stated. The ticket is to add a
> DSL overload to pass a Materialized argument to suppress. As a
 result,
> the result of the suppression would be queriable.
 This is unrelated to “persistent buffer” aka “spill-to-disk”.

 There was some confusion before about whether this ticket could be
> implemented as “query the buffer”. Maybe it can, but not trivially.
> The obvious way is just to add a new state store which we write the
> results into just before we forward. I.e., it’s exactly like the
> materialized variant of any stateless KTable operation.
 Thanks, John

 On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote: Thanks for
 the KIP Dongjin,

 I am still not sure if I can follow, what might also be caused by
 the backing JIRA ticket (maybe John can clarify the intent of the
 ticket as he created it):

 Currently, suppress() only uses an in-memory buffer and my
 understanding of the Jira is, to add the ability to use a
 persistent buffer (ie, spill to disk backed by RocksDB).

 Adding a persistent buffer is completely unrelated to allow
 querying the buffer. In fact, one could query an in-memory buffer,
 too. However, querying the buffer does not really seem to be
 useful
 as pointed out by John, as you can always query the upstream
 KTable
 store.

 Also note that for the emit-on-window-close case the result is
 deleted from the buffer when it is emitted, and thus cannot be
 queried any longe r.


 Can you please clarify if you intend to allow spilling to disk or
 if you intent to enable IQ (even if I don't see why querying make
 sense, as the data is either 

Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-11 Thread Matthias J. Sax
Thanks for the KIP.

It seem that the new exception would need to be thrown by user code?
However, in the motivation you mention the scenario of a missing source
topic that a user cannot detect, but KafkaStreams runtime would be
responsible to handle.

How do both things go together?


-Matthias

On 9/11/20 10:31 AM, Walker Carlson wrote:
> Hello all,
> 
> I have created KIP-671 to give the option to shutdown a streams
> application in response to an error.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> 
> This is because of the Jira ticket
> 
> 
> Please give it a look and let me know if you have any feedback.
> 
> Thanks,
> Walker
> 



signature.asc
Description: OpenPGP digital signature


Re: Contributor Access to JIRA

2020-09-11 Thread Matthias J. Sax
There was a reply on 9/3.

> Please create an account (self-service) and share you account info here,
> so we can add you.

Did you miss it? (There was multiple emails with the same subject and
GMail might have collapsed both threads into one.


-Matthias

On 9/11/20 10:35 AM, Kp k wrote:
> Hi,
> 
> I haven't got any response from you all.
> Please give contributor access to Kafka JIRA.
> 
> On Thu, Sep 3, 2020 at 7:19 PM Kp k  wrote:
> 
>> Hi,
>>
>> Can you please provide me Contributor access to Kafka JIRA, as I am
>> interested in contributing.
>>
>> Thanks,
>> Kalpitha
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Contributor Access to JIRA

2020-09-11 Thread Kp k
Hi,

I haven't got any response from you all.
Please give contributor access to Kafka JIRA.

On Thu, Sep 3, 2020 at 7:19 PM Kp k  wrote:

> Hi,
>
> Can you please provide me Contributor access to Kafka JIRA, as I am
> interested in contributing.
>
> Thanks,
> Kalpitha
>


[DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-11 Thread Walker Carlson
Hello all,

I have created KIP-671 to give the option to shutdown a streams
application in response to an error.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown

This is because of the Jira ticket


Please give it a look and let me know if you have any feedback.

Thanks,
Walker


Re: [DISCUSS] KIP-516: Topic Identifiers

2020-09-11 Thread John Roesler
Thanks, Justine!

Your response seems compelling to me.

-John

On Fri, 2020-09-11 at 10:17 -0700, Justine Olshan wrote:
> Hello all,
> Thanks for continuing the discussion! I have a few responses to your points.
> 
> Tom: You are correct in that this KIP has not mentioned the
> DeleteTopicsRequest. I think that this would be out of scope for now, but
> may be something worth adding in the future.
> 
> John: We did consider sequence ids, but there are a few reasons to favor
> UUIDs. There are several cases where topics from different clusters may
> interact now and in the future. For example, Mirror Maker 2 may benefit
> from being able to detect when a cluster being mirrored is deleted and
> recreated and globally unique identifiers would make resolving issues
> easier than sequence IDs which may collide between clusters. KIP-405
> (tiered storage) will also benefit from globally unique IDs as shared
> buckets may be used between clusters.
> 
> Globally unique IDs would also make functionality like moving topics
> between disparate clusters easier in the future, simplify any future
> implementations of backups and restores, and more. In general, unique IDs
> would ensure that the source cluster topics do not conflict with the
> destination cluster topics.
> 
> If we were to use sequence ids, we would need sufficiently large cluster
> ids to be stored with the topic identifiers or we run the risk of
> collisions. This will give up any advantage in compactness that sequence
> numbers may bring. Given these advantages I think it makes sense to use
> UUIDs.
> 
> Gokul: This is an interesting idea, but this is a breaking change. Out of
> scope for now, but maybe worth discussing in the future.
> 
> Hope this explains some of the decisions,
> 
> Justine
> 
> 
> 
> On Fri, Sep 11, 2020 at 8:27 AM Gokul Ramanan Subramanian <
> gokul24...@gmail.com> wrote:
> 
> > Hi.
> > 
> > Thanks for the KIP.
> > 
> > Have you thought about whether it makes sense to support authorizing a
> > principal for a topic ID rather than a topic name to achieve tighter
> > security?
> > 
> > Or is the topic ID fundamentally an internal detail similar to epochs used
> > in a bunch of other places in Kafka?
> > 
> > Thanks.
> > 
> > On Fri, Sep 11, 2020 at 4:06 PM John Roesler  wrote:
> > 
> > > Hello Justine,
> > > 
> > > Thanks for the KIP!
> > > 
> > > I happen to have been confronted recently with the need to keep track of
> > a
> > > large number of topics as compactly as possible. I was going to come up
> > > with some way to dictionary encode the topic names as integers, but this
> > > seems much better!
> > > 
> > > Apologies if this has been raised before, but I’m wondering about the
> > > choice of UUID vs sequence number for the ids. Typically, I’ve seen UUIDs
> > > in two situations:
> > > 1. When processes need to generate non-colliding identifiers without
> > > coordination.
> > > 2. When the identifier needs to be “universally unique”; I.e., the
> > > identifier needs to distinguish the entity from all other entities that
> > > could ever exist. This is useful in cases where entities from all kinds
> > of
> > > systems get mixed together, such as when dumping logs from all processes
> > in
> > > a company into a common system.
> > > 
> > > Maybe I’m being short-sighted, but it doesn’t seem like either really
> > > applies here. It seems like the brokers could and would achieve consensus
> > > when creating a topic anyway, which is all that’s required to generate
> > > non-colliding sequence ids. For the second, as you mention, we could
> > assign
> > > a UUID to the cluster as a whole, which would render any resource scoped
> > to
> > > the broker universally unique as well.
> > > 
> > > The reason I mention this is that, although a UUID is way more compact
> > > than topic names, it’s still 16 bytes. In contrast, a 4-byte integer
> > > sequence id would give us 4 billion unique topics per cluster, which
> > seems
> > > like enough ;)
> > > 
> > > Considering the number of different times these topic identifiers are
> > sent
> > > over the wire or stored in memory, it seems like it might be worth the
> > > additional 4x space savings.
> > > 
> > > What do you think about this?
> > > 
> > > Thanks,
> > > John
> > > 
> > > On Fri, Sep 11, 2020, at 03:20, Tom Bentley wrote:
> > > > Hi Justine,
> > > > 
> > > > This looks like a very welcome improvement. Thanks!
> > > > 
> > > > Maybe I missed it, but the KIP doesn't seem to mention changing
> > > > DeleteTopicsRequest to identify the topic using an id. Maybe that's out
> > > of
> > > > scope, but DeleteTopicsRequest is not listed among the Future Work APIs
> > > > either.
> > > > 
> > > > Kind regards,
> > > > 
> > > > Tom
> > > > 
> > > > On Thu, Sep 10, 2020 at 3:59 PM Satish Duggana <
> > satish.dugg...@gmail.com
> > > > wrote:
> > > > 
> > > > > Thanks Lucas/Justine for the nice KIP.
> > > > > 
> > > > > It has several benefits which also include simplifying the topic
> > 

Re: [DISCUSS] KIP-516: Topic Identifiers

2020-09-11 Thread Justine Olshan
Hello all,
Thanks for continuing the discussion! I have a few responses to your points.

Tom: You are correct in that this KIP has not mentioned the
DeleteTopicsRequest. I think that this would be out of scope for now, but
may be something worth adding in the future.

John: We did consider sequence ids, but there are a few reasons to favor
UUIDs. There are several cases where topics from different clusters may
interact now and in the future. For example, Mirror Maker 2 may benefit
from being able to detect when a cluster being mirrored is deleted and
recreated and globally unique identifiers would make resolving issues
easier than sequence IDs which may collide between clusters. KIP-405
(tiered storage) will also benefit from globally unique IDs as shared
buckets may be used between clusters.

Globally unique IDs would also make functionality like moving topics
between disparate clusters easier in the future, simplify any future
implementations of backups and restores, and more. In general, unique IDs
would ensure that the source cluster topics do not conflict with the
destination cluster topics.

If we were to use sequence ids, we would need sufficiently large cluster
ids to be stored with the topic identifiers or we run the risk of
collisions. This will give up any advantage in compactness that sequence
numbers may bring. Given these advantages I think it makes sense to use
UUIDs.

Gokul: This is an interesting idea, but this is a breaking change. Out of
scope for now, but maybe worth discussing in the future.

Hope this explains some of the decisions,

Justine



On Fri, Sep 11, 2020 at 8:27 AM Gokul Ramanan Subramanian <
gokul24...@gmail.com> wrote:

> Hi.
>
> Thanks for the KIP.
>
> Have you thought about whether it makes sense to support authorizing a
> principal for a topic ID rather than a topic name to achieve tighter
> security?
>
> Or is the topic ID fundamentally an internal detail similar to epochs used
> in a bunch of other places in Kafka?
>
> Thanks.
>
> On Fri, Sep 11, 2020 at 4:06 PM John Roesler  wrote:
>
> > Hello Justine,
> >
> > Thanks for the KIP!
> >
> > I happen to have been confronted recently with the need to keep track of
> a
> > large number of topics as compactly as possible. I was going to come up
> > with some way to dictionary encode the topic names as integers, but this
> > seems much better!
> >
> > Apologies if this has been raised before, but I’m wondering about the
> > choice of UUID vs sequence number for the ids. Typically, I’ve seen UUIDs
> > in two situations:
> > 1. When processes need to generate non-colliding identifiers without
> > coordination.
> > 2. When the identifier needs to be “universally unique”; I.e., the
> > identifier needs to distinguish the entity from all other entities that
> > could ever exist. This is useful in cases where entities from all kinds
> of
> > systems get mixed together, such as when dumping logs from all processes
> in
> > a company into a common system.
> >
> > Maybe I’m being short-sighted, but it doesn’t seem like either really
> > applies here. It seems like the brokers could and would achieve consensus
> > when creating a topic anyway, which is all that’s required to generate
> > non-colliding sequence ids. For the second, as you mention, we could
> assign
> > a UUID to the cluster as a whole, which would render any resource scoped
> to
> > the broker universally unique as well.
> >
> > The reason I mention this is that, although a UUID is way more compact
> > than topic names, it’s still 16 bytes. In contrast, a 4-byte integer
> > sequence id would give us 4 billion unique topics per cluster, which
> seems
> > like enough ;)
> >
> > Considering the number of different times these topic identifiers are
> sent
> > over the wire or stored in memory, it seems like it might be worth the
> > additional 4x space savings.
> >
> > What do you think about this?
> >
> > Thanks,
> > John
> >
> > On Fri, Sep 11, 2020, at 03:20, Tom Bentley wrote:
> > > Hi Justine,
> > >
> > > This looks like a very welcome improvement. Thanks!
> > >
> > > Maybe I missed it, but the KIP doesn't seem to mention changing
> > > DeleteTopicsRequest to identify the topic using an id. Maybe that's out
> > of
> > > scope, but DeleteTopicsRequest is not listed among the Future Work APIs
> > > either.
> > >
> > > Kind regards,
> > >
> > > Tom
> > >
> > > On Thu, Sep 10, 2020 at 3:59 PM Satish Duggana <
> satish.dugg...@gmail.com
> > >
> > > wrote:
> > >
> > > > Thanks Lucas/Justine for the nice KIP.
> > > >
> > > > It has several benefits which also include simplifying the topic
> > > > deletion process by controller and logs cleanup by brokers in corner
> > > > cases.
> > > >
> > > > Best,
> > > > Satish.
> > > >
> > > > On Wed, Sep 9, 2020 at 10:07 PM Justine Olshan  >
> > > > wrote:
> > > > >
> > > > > Hello all, it's been almost a year! I've made some changes to this
> > KIP
> > > > and hope to continue the discussion.
> > > > >
> > > > > One of the main 

Re: [DISCUSS] KIP-660: Pluggable ReplicaAssignor

2020-09-11 Thread Robert Barrett
Thanks Mickael, I think adding the new Exception resolves my concerns.

On Thu, Sep 3, 2020 at 9:47 AM Mickael Maison 
wrote:

> Thanks Robert and Ryanne for the feedback.
>
> ReplicaAssignor implementations can throw an exception to indicate an
> assignment can't be computed. This is already what the current round
> robin assignor does. Unfortunately at the moment, there are no generic
> error codes if it fails, it's either INVALID_PARTITIONS,
> INVALID_REPLICATION_FACTOR or worse UNKNOWN_SERVER_ERROR.
>
> So I think it would be nice to introduce a new Exception/Error code to
> cover any failures in the assignor and avoid UNKNOWN_SERVER_ERROR.
>
> I've updated the KIP accordingly, let me know if you have more questions.
>
> On Fri, Aug 28, 2020 at 4:49 PM Ryanne Dolan 
> wrote:
> >
> > Thanks Mickael, the KIP makes sense to me, esp for cases where an
> external
> > system (like cruise control or an operator) knows more about the target
> > cluster state than the broker does.
> >
> > Ryanne
> >
> > On Thu, Aug 20, 2020, 10:46 AM Mickael Maison 
> > wrote:
> >
> > > Hi,
> > >
> > > I've created KIP-660 to make the replica assignment logic pluggable.
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-660%3A+Pluggable+ReplicaAssignor
> > >
> > > Please take a look and let me know if you have any feedback.
> > >
> > > Thanks
> > >
>


Re: [VOTE] KIP-664: Provide tooling to detect and abort hanging transactions

2020-09-11 Thread Robert Barrett
+1 (non-binding)

Thanks Jason!

On Tue, Sep 8, 2020 at 5:28 PM Guozhang Wang  wrote:

> +1. Thanks!
>
> Guozhang
>
> On Tue, Sep 8, 2020 at 3:04 PM Ron Dagostino  wrote:
>
> > +1 (non-binding) -- Thanks, Jason!
> >
> > Ron
> >
> > On Tue, Sep 8, 2020 at 2:04 PM Jason Gustafson 
> wrote:
> >
> > > Hi All,
> > >
> > > I'd like to start a vote on KIP-664:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions
> > > .
> > > Thanks for all the feedback!
> > >
> > > Best,
> > > Jason
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-09-11 Thread Colin McCabe
Hi Unmesh,

I think you're right that we should use a duration here rather than a time.  As 
you said, the clock on the controller will probably not match the one on the 
broker.  I have updated the KIP.

> > It's important to keep in mind that messages may be delayed in the
> > network, or arrive out of order.  When this happens, we will use the start
> > time specified in the request to determine if the request is stale.
> I am assuming that there will be a single TCP connection maintained between
> broker and active controller. So, there won't be any out of order requests?
> There will be a scenario of broker GC pause, which might cause connection
> timeout and broker might need to reestablish the connection. If the pause
> is too long, lease will expire and the heartbeat sent after the pause will
> be treated as a new registration (similar to restart case), and a new epoch
> number will be assigned to the broker.

I agree with the end of this paragraph, but not with the start :)

There can be out-of-order requests, since the broker will simply use a new TCP 
connection if the old one has problems.  This can happen for a variety of 
reasons.  I don't think GC pauses are the most common reason for this to 
happen.  It's more common to see issues issues in the network itself that 
result connections getting dropped from time to time.

So we have to assume that messages may arrive out of order, and possibly be 
delayed.  I added a note that heartbeat requests should be ignored if the 
metadata log offset they contain is smaller than a previous heartbeat.

> When the active controller fails, the new active controller needs to be
> sure that it considers all the known brokers as alive till the lease
> expiration interval.  Because registration.lease.timeout.ms, is configured
> on the controller, the new active controller will extend all the leases by
> registration.lease.timeout.ms. I see that it won't need last heartbeat
> time.

Agreed.

best,
Colin

> 
> Thanks,
> Unmesh
> 
> On Sat, Sep 5, 2020 at 1:28 AM Colin McCabe  wrote:
> 
> > > Colin wrote:
> > > > The reason for including LeaseStartTimeMs in the request is to ensure
> > > > that the time required to communicate with the controller gets
> > included in
> > > > the lease time.  Since requests can potentially be delayed in the
> > network
> > > > for a long time, this is important.
> >
> > On Mon, Aug 31, 2020, at 05:58, Unmesh Joshi wrote:
> > > The network time will be added anyway, because the lease timer on the
> > > active controller will start only after the heartbeat request reaches the
> > > server.
> >
> > Hi Unmesh,
> >
> > If the start time is not specified in the request, then the network time
> > is excluded from the heartbeat time.
> >
> > Here's an example:
> > Let's say broker A sends a heartbeat at time 100, and it arrives on the
> > controller at time 200, and the lease duration is 1000.
> >
> > The controller looks at the start time in the request, which is 100, and
> > adds 1000 to it, getting 1100.  On the other hand, if start time is not
> > specified in the request, then the expiration will be at time 1200.
> > That is what I mean by "the network time is included in the expiration
> > time."
> >
> > > And I think, some assumption about network round trip time is
> > > needed anyway to decide on the frequency of the heartbeat (
> > > registration.heartbeat.interval.ms), and lease timeout (
> > > registration.lease.timeout.ms). So I think just having a leaseTTL in the
> > > request is easier to understand and implement.
> >
> > It's important to keep in mind that messages may be delayed in the
> > network, or arrive out of order.  When this happens, we will use the start
> > time specified in the request to determine if the request is stale.
> >
> > > > Yes, I agree that the lease timeout on the controller side should be
> > > > reset in the case of controller failover.  The alternative would be to
> > > > track the lease as hard state rather than soft state, but I think that
> > > > is not really needed, and would result in more log entries.
> > > My interpretation of the mention of BrokerRecord in the KIP was that this
> > > record exists in the Raft log.
> >
> > BrokerRecord does exist in the Raft log, but does not include the last
> > heartbeat time.
> >
> > > By soft state, do you mean the broker
> > > records exist only on the active leader and will not be replicated in the
> > > raft log? If the live brokers list is maintained only on the active
> > > controller (raft leader), then, in case of leader failure, there will be
> > a
> > > window where the new leader does not know about the live brokers, till
> > the
> > > brokers establish the leases again.
> > > I think it will be safer to have leases as a hard state managed by
> > standard
> > > Raft replication.
> >
> > Leases are short, so the need to re-establish them after a controller
> > failover doesn't seem like a big problem.  But this is something we can
> > 

Re: Kip creation permission

2020-09-11 Thread Matthias J. Sax
Done.

On 9/11/20 9:02 AM, Walker Carlson wrote:
> Can you give me permission to create a Kip? username: wcarlson
> 
> thanks,
> Walker
> 



signature.asc
Description: OpenPGP digital signature


Kip creation permission

2020-09-11 Thread Walker Carlson
Can you give me permission to create a Kip? username: wcarlson

thanks,
Walker


Re: [DISCUSS] KIP-516: Topic Identifiers

2020-09-11 Thread Gokul Ramanan Subramanian
Hi.

Thanks for the KIP.

Have you thought about whether it makes sense to support authorizing a
principal for a topic ID rather than a topic name to achieve tighter
security?

Or is the topic ID fundamentally an internal detail similar to epochs used
in a bunch of other places in Kafka?

Thanks.

On Fri, Sep 11, 2020 at 4:06 PM John Roesler  wrote:

> Hello Justine,
>
> Thanks for the KIP!
>
> I happen to have been confronted recently with the need to keep track of a
> large number of topics as compactly as possible. I was going to come up
> with some way to dictionary encode the topic names as integers, but this
> seems much better!
>
> Apologies if this has been raised before, but I’m wondering about the
> choice of UUID vs sequence number for the ids. Typically, I’ve seen UUIDs
> in two situations:
> 1. When processes need to generate non-colliding identifiers without
> coordination.
> 2. When the identifier needs to be “universally unique”; I.e., the
> identifier needs to distinguish the entity from all other entities that
> could ever exist. This is useful in cases where entities from all kinds of
> systems get mixed together, such as when dumping logs from all processes in
> a company into a common system.
>
> Maybe I’m being short-sighted, but it doesn’t seem like either really
> applies here. It seems like the brokers could and would achieve consensus
> when creating a topic anyway, which is all that’s required to generate
> non-colliding sequence ids. For the second, as you mention, we could assign
> a UUID to the cluster as a whole, which would render any resource scoped to
> the broker universally unique as well.
>
> The reason I mention this is that, although a UUID is way more compact
> than topic names, it’s still 16 bytes. In contrast, a 4-byte integer
> sequence id would give us 4 billion unique topics per cluster, which seems
> like enough ;)
>
> Considering the number of different times these topic identifiers are sent
> over the wire or stored in memory, it seems like it might be worth the
> additional 4x space savings.
>
> What do you think about this?
>
> Thanks,
> John
>
> On Fri, Sep 11, 2020, at 03:20, Tom Bentley wrote:
> > Hi Justine,
> >
> > This looks like a very welcome improvement. Thanks!
> >
> > Maybe I missed it, but the KIP doesn't seem to mention changing
> > DeleteTopicsRequest to identify the topic using an id. Maybe that's out
> of
> > scope, but DeleteTopicsRequest is not listed among the Future Work APIs
> > either.
> >
> > Kind regards,
> >
> > Tom
> >
> > On Thu, Sep 10, 2020 at 3:59 PM Satish Duggana  >
> > wrote:
> >
> > > Thanks Lucas/Justine for the nice KIP.
> > >
> > > It has several benefits which also include simplifying the topic
> > > deletion process by controller and logs cleanup by brokers in corner
> > > cases.
> > >
> > > Best,
> > > Satish.
> > >
> > > On Wed, Sep 9, 2020 at 10:07 PM Justine Olshan 
> > > wrote:
> > > >
> > > > Hello all, it's been almost a year! I've made some changes to this
> KIP
> > > and hope to continue the discussion.
> > > >
> > > > One of the main changes I've added is now the metadata response will
> > > include the topic ID (as Colin suggested). Clients can obtain the
> topicID
> > > of a given topic through a TopicDescription. The topicId will also be
> > > included with the UpdateMetadata request.
> > > >
> > > > Let me know what you all think.
> > > > Thank you,
> > > > Justine
> > > >
> > > > On 2019/09/13 16:38:26, "Colin McCabe"  wrote:
> > > > > Hi Lucas,
> > > > >
> > > > > Thanks for tackling this.  Topic IDs are a great idea, and this is
> a
> > > really good writeup.
> > > > >
> > > > > For /brokers/topics/[topic], the schema version should be bumped to
> > > version 3, rather than 2.  KIP-455 bumped the version of this znode to
> 2
> > > already :)
> > > > >
> > > > > Given that we're going to be seeing these things as strings as lot
> (in
> > > logs, in ZooKeeper, on the command-line, etc.), does it make sense to
> use
> > > base64 when converting them to strings?
> > > > >
> > > > > Here is an example of the hex representation:
> > > > > 6fcb514b-b878-4c9d-95b7-8dc3a7ce6fd8
> > > > >
> > > > > And here is an example in base64.
> > > > > b8tRS7h4TJ2Vt43Dp85v2A
> > > > >
> > > > > The base64 version saves 15 letters (to be fair, 4 of those were
> > > dashes that we could have elided in the hex representation.)
> > > > >
> > > > > Another thing to consider is that we should specify that the
> > > all-zeroes UUID is not a valid topic UUID.   We can't use null for this
> > > because we can't pass a null UUID over the RPC protocol (there is no
> > > special pattern for null, nor do we want to waste space reserving such
> a
> > > pattern.)
> > > > >
> > > > > Maybe I missed it, but did you describe "migration of... existing
> > > topic[s] without topic IDs" in detail in any section?  It seems like
> when
> > > the new controller becomes active, it should just generate random
> UUIDs for
> > > these, and write the 

Re: [DISCUSS] KIP-516: Topic Identifiers

2020-09-11 Thread John Roesler
Hello Justine,

Thanks for the KIP!

I happen to have been confronted recently with the need to keep track of a 
large number of topics as compactly as possible. I was going to come up with 
some way to dictionary encode the topic names as integers, but this seems much 
better!

Apologies if this has been raised before, but I’m wondering about the choice of 
UUID vs sequence number for the ids. Typically, I’ve seen UUIDs in two 
situations:
1. When processes need to generate non-colliding identifiers without 
coordination. 
2. When the identifier needs to be “universally unique”; I.e., the identifier 
needs to distinguish the entity from all other entities that could ever exist. 
This is useful in cases where entities from all kinds of systems get mixed 
together, such as when dumping logs from all processes in a company into a 
common system. 

Maybe I’m being short-sighted, but it doesn’t seem like either really applies 
here. It seems like the brokers could and would achieve consensus when creating 
a topic anyway, which is all that’s required to generate non-colliding sequence 
ids. For the second, as you mention, we could assign a UUID to the cluster as a 
whole, which would render any resource scoped to the broker universally unique 
as well. 

The reason I mention this is that, although a UUID is way more compact than 
topic names, it’s still 16 bytes. In contrast, a 4-byte integer sequence id 
would give us 4 billion unique topics per cluster, which seems like enough ;)

Considering the number of different times these topic identifiers are sent over 
the wire or stored in memory, it seems like it might be worth the additional 4x 
space savings. 

What do you think about this?

Thanks,
John

On Fri, Sep 11, 2020, at 03:20, Tom Bentley wrote:
> Hi Justine,
> 
> This looks like a very welcome improvement. Thanks!
> 
> Maybe I missed it, but the KIP doesn't seem to mention changing
> DeleteTopicsRequest to identify the topic using an id. Maybe that's out of
> scope, but DeleteTopicsRequest is not listed among the Future Work APIs
> either.
> 
> Kind regards,
> 
> Tom
> 
> On Thu, Sep 10, 2020 at 3:59 PM Satish Duggana 
> wrote:
> 
> > Thanks Lucas/Justine for the nice KIP.
> >
> > It has several benefits which also include simplifying the topic
> > deletion process by controller and logs cleanup by brokers in corner
> > cases.
> >
> > Best,
> > Satish.
> >
> > On Wed, Sep 9, 2020 at 10:07 PM Justine Olshan 
> > wrote:
> > >
> > > Hello all, it's been almost a year! I've made some changes to this KIP
> > and hope to continue the discussion.
> > >
> > > One of the main changes I've added is now the metadata response will
> > include the topic ID (as Colin suggested). Clients can obtain the topicID
> > of a given topic through a TopicDescription. The topicId will also be
> > included with the UpdateMetadata request.
> > >
> > > Let me know what you all think.
> > > Thank you,
> > > Justine
> > >
> > > On 2019/09/13 16:38:26, "Colin McCabe"  wrote:
> > > > Hi Lucas,
> > > >
> > > > Thanks for tackling this.  Topic IDs are a great idea, and this is a
> > really good writeup.
> > > >
> > > > For /brokers/topics/[topic], the schema version should be bumped to
> > version 3, rather than 2.  KIP-455 bumped the version of this znode to 2
> > already :)
> > > >
> > > > Given that we're going to be seeing these things as strings as lot (in
> > logs, in ZooKeeper, on the command-line, etc.), does it make sense to use
> > base64 when converting them to strings?
> > > >
> > > > Here is an example of the hex representation:
> > > > 6fcb514b-b878-4c9d-95b7-8dc3a7ce6fd8
> > > >
> > > > And here is an example in base64.
> > > > b8tRS7h4TJ2Vt43Dp85v2A
> > > >
> > > > The base64 version saves 15 letters (to be fair, 4 of those were
> > dashes that we could have elided in the hex representation.)
> > > >
> > > > Another thing to consider is that we should specify that the
> > all-zeroes UUID is not a valid topic UUID.   We can't use null for this
> > because we can't pass a null UUID over the RPC protocol (there is no
> > special pattern for null, nor do we want to waste space reserving such a
> > pattern.)
> > > >
> > > > Maybe I missed it, but did you describe "migration of... existing
> > topic[s] without topic IDs" in detail in any section?  It seems like when
> > the new controller becomes active, it should just generate random UUIDs for
> > these, and write the random UUIDs back to ZooKeeper.  It would be good to
> > spell that out.  We should make it clear that this happens regardless of
> > the inter-broker protocol version (it's a compatible change).
> > > >
> > > > "LeaderAndIsrRequests including an is_every_partition flag" seems a
> > bit wordy.  Can we just call these "full LeaderAndIsrRequests"?  Then the
> > RPC field could be named "full".  Also, it would probably be better for the
> > RPC field to be an enum of { UNSPECIFIED, INCREMENTAL, FULL }, so that we
> > can cleanly handle old versions (by treating 

[jira] [Created] (KAFKA-10476) Error when calling (kafka-configs.sh --add-config): org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient..... Caused by: javax.crypto.BadPadding

2020-09-11 Thread AbdulRahman Mahmoud (Jira)
AbdulRahman Mahmoud created KAFKA-10476:
---

 Summary: Error when calling (kafka-configs.sh --add-config): 
org.apache.kafka.common.KafkaException: Failed to create new 
KafkaAdminClient. Caused by: javax.crypto.BadPaddingException: Given final 
block 
 Key: KAFKA-10476
 URL: https://issues.apache.org/jira/browse/KAFKA-10476
 Project: Kafka
  Issue Type: Bug
  Components: config
Affects Versions: 2.2.1
Reporter: AbdulRahman Mahmoud
 Attachments: Error.txt, Script.txt

For now We created a Cluster on AWS with MSK service, we can connetc to it 
through TLS by our ceritifcates for now we can create topics, produce and 
cosume with no problems.

however   when we use the same configuration for the "kafka-config-sh 
--add-config", like this attached  [^Script.txt] snippet for example it fails, 
i have attached a file([^Error.txt]) for the log of the error.   

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is back to normal : Kafka » kafka-trunk-jdk11 #57

2020-09-11 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-654 Aborting Transaction with non-flushed data should throw a non-fatal Exception

2020-09-11 Thread Gokul Srinivas

All,

As the vote has passed, I have raised a PR here:
https://github.com/apache/kafka/pull/9280

Please help review.

Thanks,
Gokul

On 04-09-2020 00:48, Gokul Srinivas wrote:

Appreciate the help!

On 04-09-2020 00:46, Sophie Blee-Goldman wrote:

Yep, you can go ahead and call for a vote on the KIP

On Thu, Sep 3, 2020 at 12:09 PM Gokul Srinivas  
wrote:


    Sophie,

    That sounds fair. I've updated the KIP to state the same message for
    backward compatibility to existing (albeit hacky) solutions.

    As this is my first ever contribution - is the next step to
    initiate the
    voting on this KIP?

    -Gokul

    On 04-09-2020 00:34, Sophie Blee-Goldman wrote:
    > I think the current proposal looks good to me. One minor
    suggestion I have
    > is to consider keeping the same error message:
    >
    > Failing batch since transaction was aborted
    >
    >
    > When we were running into this issue in Streams and accidentally
    rethrowing
    > the KafkaException as fatal, we ended up checking the specific
    error message
    > of the KafkaException and swallowing the exception if it was
    equivalent to
    > the
    > above. Obviously this was pretty hacky (hence the motivation for
    this KIP)
    > and
    > luckily we found a way around this, but it makes me wonder if any
    > applications
    > out there might be doing the same. So maybe we should reuse the
    old error
    > message just in case?
    >
    > Besides that, this KIP LGTM
    >
    > On Thu, Sep 3, 2020 at 5:23 AM Gokul Srinivas
     wrote:
    >
    >> All,
    >>
    >> Gentle reminder - any comments on the line of thinking I
    mentioned in
    >> the last email? I've updated the Exception to be named
    >> "TransactionAbortedException" on the KIP confluence page.
    >>
    >> -Gokul
    >>
    >> On 01-09-2020 18:34, Gokul Srinivas wrote:
    >>> Matthias, Sophie, Jason,
    >>>
    >>> Took another pass at understanding the internals and it seems
    to me
    >>> like we should be extending the `ApiException` rather than the
    >>> `RetriableException`.
    >>>
    >>> The check in question
    >>> =
    >>>
    >>> Do we abort any undrained batches that are present on this
    transaction
    >>> if the transaction is in an aborting state? And, if we do,
    what would
    >>> be the reason sent back to the user for aborting these batches?
    >>>
    >>> Logic for this
    >>> ==
    >>>
    >>> If the transaction `isAborting` and `hasAbortableError` and the
    >>> `lastError()` is not empty -> then there has been some error 
which

    >>> will cause / has caused the transaction to abort and this *is* a
    >>> runtime exception. This same exception should be sent back to
    the user.
    >>>
    >>> If the transaction `isAborting` and `lastError()` is empty ->
    then for
    >>> some unknown reason (maybe even user initiated, according to the
    >>> tests), the transaction manager has started to abort the
    transaction.
    >>> In this case, the newly proposed exception should be sent back
    to the
    >>> user.
    >>>
    >>> Reasoning
    >>> =
    >>>
    >>> Prima facie - I do not think this is a `RetriableException`.
    >>>
    >>> If the user has chosen to abort this transaction, then it
    would be up
    >>> to the user to choose whether to retry the exception, in which
    case it
    >>> is /*not*/ a `RetriableException`.
    >>>
    >>> If there is a case where the transaction manager has no error,
    but has
    >>> started to abort the exception, we still do not retry the
    transaction,
    >>> rather we abort any undrained batches - in which case, it is
    /*still
    >>> not*/ a `RetriableException`.
    >>>
    >>> Does that sound right?
    >>>
    >>> -Gokul
    >>>
    >>> On 29-08-2020 01:17, Jason Gustafson wrote:
     Hi Gokul,
    
     Thanks, I think it makes sense to use a separate exception
    type. +1 on
     Sophie's suggestion for `TransactionAbortedException`.
    
     Extending from `RetriableException` seems reasonable as well.
    I guess
     the
     only question is whether it's safe to catch it as a
    `RetriableException`
     and apply common retry logic. For a transactional producer, my
     expectation
     is that the application would abort the transaction and 
retry it.

     However,
     if the transaction is already being aborted, maybe it would
    be better to
     skip the abort. It might be helpful to have an example which
    shows
     how we
     expect applications to handle this.
    
     Thanks,
     Jason
    
    
    
    
    
    
     On Thu, Aug 27, 2020 at 6:25 PM Sophie Blee-Goldman
     mailto:sop...@confluent.io>>
     wrote:
    
    > Hey Gokul, thanks for taking up this KIP!
    >
    > I agree with Matthias that directly extending 

Re: [DISCUSS] KIP idea: Support of multipart messages

2020-09-11 Thread Alexander Sibiryakov
Hello,

I don't have an answer for this. Ideally, the bigger the better. For our
use case: working with Web data, hundreds of megabytes should be enough.
But there could be other use cases, such as transferring of binary or media
files.

There is another dimension to consider: amount of large messages in the
stream. In our case it is less than 0.5%. If this number is high enough,
Kafka will be out of consideration, I assume.

A.

On Thu, Sep 10, 2020 at 7:02 PM Ismael Juma  wrote:

> Thanks for the KIP. I think the main question is what's the upper bound for
> message size you are looking to support. Is it hundreds of MBs, GBs, tens
> of GBs, something else? That would inform the options.
>
> Ismael
>
> On Thu, Sep 10, 2020 at 8:03 AM Gwen Shapira  wrote:
>
> > There is another option of doing the splitting on the server and hiding
> > this from the clients. My personal (and highly controversial) take is
> that
> > Kafka clients could use less complexity rather than more. They are
> > incredibly difficult to reason about as is.  But maybe this
> > splitting/merging won't be that bad - multi-part messages are well
> > understood in general.
> >
> > On Thu, Sep 10, 2020 at 7:51 AM Gwen Shapira  wrote:
> >
> > > There is also another approach (harder to design, but may be easier to
> > use
> > > and maintain), which is to make Kafka handle large messages better and
> > > allow users to set higher limits - for example, can Kafka provide
> really
> > > high throughput with 1GB messages? Some systems do it well.
> > >
> > > I don't know where the slowdowns happen, but perhaps it is one of
> these?
> > > 1. Java GC used to be a problem, but maybe we didn't try with newer GC
> > and
> > > simple tuning will solve it?
> > > 2. We have head-of-line blocking issue on the queue. There are
> approaches
> > > to solve that too.
> > >
> > > I'd love to see more exploration on what exactly is the problem we are
> > > facing (and is it still an issue? Becket's work is a few years old
> now.)
> > >
> > > On Thu, Sep 10, 2020 at 12:21 AM Alexander Sibiryakov <
> > > sibirya...@scrapinghub.com> wrote:
> > >
> > >> Hey Ben, thanks for the link. My proposal is partially based on
> Becket's
> > >> ideas, but I haven't reached out to him directly.
> > >>
> > >> +Becket
> > >> Hi Becket, would you mind to have a look at my proposal (link is in
> the
> > >> first message) ?
> > >>
> > >> A.
> > >>
> > >> On Tue, Sep 8, 2020 at 12:35 PM Ben Stopford 
> wrote:
> > >>
> > >> > LinkedIn had something like this. Becket did a talk on it a few
> years
> > >> ago.
> > >> > It would be interesting to know what became of it and if there were
> > >> lessons
> > >> > learned.
> > >> > https://www.youtube.com/watch?v=ZrbaXDYUZY8
> > >> >
> > >> > On Fri, 4 Sep 2020 at 08:17, Alexander Sibiryakov <
> > >> > sibirya...@scrapinghub.com> wrote:
> > >> >
> > >> > > Hello,
> > >> > >
> > >> > > I would like to get your opinions on this KIP idea.
> > >> > >
> > >> > > In short it will allow to transfer messages of bigger size than
> > >> allowed
> > >> > by
> > >> > > the broker.
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> https://docs.google.com/document/d/1cKBNxPkVVENly9YczYXsVDVWwrCdRq3G_cja5s2QDQg/edit?usp=sharing
> > >> > >
> > >> > > If all that makes sense, I'll create a full fledged KIP document
> and
> > >> > expand
> > >> > > the idea.
> > >> > >
> > >> > > Thanks,
> > >> > > A.
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> >
> > >> > Ben Stopford
> > >> >
> > >>
> > >
> > >
> > > --
> > > Gwen Shapira
> > > Engineering Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter | blog
> > >
> >
> >
> > --
> > Gwen Shapira
> > Engineering Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
> >
>


Re: [DISCUSS] KIP idea: Support of multipart messages

2020-09-11 Thread Alexander Sibiryakov
See my answers below,

On Thu, Sep 10, 2020 at 5:03 PM Gwen Shapira  wrote:

> There is another option of doing the splitting on the server and hiding
> this from the clients. My personal (and highly controversial) take is that
> Kafka clients could use less complexity rather than more. They are
> incredibly difficult to reason about as is.  But maybe this
> splitting/merging won't be that bad - multi-part messages are well
> understood in general.
>
Sure, let's decide about the approach first and then figure out where we
should put it. As Tim mentioned, the alternative is to have a separate file
per message on broker side, which is read/written in a streaming way.


> On Thu, Sep 10, 2020 at 7:51 AM Gwen Shapira  wrote:
>
> > There is also another approach (harder to design, but may be easier to
> use
> > and maintain), which is to make Kafka handle large messages better and
> > allow users to set higher limits - for example, can Kafka provide really
> > high throughput with 1GB messages? Some systems do it well.
>
It is definitely possible. Although it seems complex to build a system
which is efficient for both: small (Mb) and large (Gb) messages.
What systems do you have in mind?

>
> > I don't know where the slowdowns happen, but perhaps it is one of these?
> > 1. Java GC used to be a problem, but maybe we didn't try with newer GC
> and
> > simple tuning will solve it?
>
I'm afraid it will affect optimizations made for normal messages in page
and write caches. When large chunk is getting there it will inevitably push
out smaller ones.

> 2. We have head-of-line blocking issue on the queue. There are approaches
> > to solve that too.
>
One of the options is to set reasonable limits on multi part messages, to
reduce the time for blocking.

>
> > I'd love to see more exploration on what exactly is the problem we are
> > facing (and is it still an issue? Becket's work is a few years old now.)
>
It is definitely an issue, especially for the users of managed Kafka
services like Confluent Cloud. These are forced to use either
referenced-based or chunking.
For those who are deploying Kafka themselves there are options to increase
message size, at the price of slowing down the broker.

A.

> >
> > On Thu, Sep 10, 2020 at 12:21 AM Alexander Sibiryakov <
> > sibirya...@scrapinghub.com> wrote:
> >
> >> Hey Ben, thanks for the link. My proposal is partially based on Becket's
> >> ideas, but I haven't reached out to him directly.
> >>
> >> +Becket
> >> Hi Becket, would you mind to have a look at my proposal (link is in the
> >> first message) ?
> >>
> >> A.
> >>
> >> On Tue, Sep 8, 2020 at 12:35 PM Ben Stopford  wrote:
> >>
> >> > LinkedIn had something like this. Becket did a talk on it a few years
> >> ago.
> >> > It would be interesting to know what became of it and if there were
> >> lessons
> >> > learned.
> >> > https://www.youtube.com/watch?v=ZrbaXDYUZY8
> >> >
> >> > On Fri, 4 Sep 2020 at 08:17, Alexander Sibiryakov <
> >> > sibirya...@scrapinghub.com> wrote:
> >> >
> >> > > Hello,
> >> > >
> >> > > I would like to get your opinions on this KIP idea.
> >> > >
> >> > > In short it will allow to transfer messages of bigger size than
> >> allowed
> >> > by
> >> > > the broker.
> >> > >
> >> > >
> >> > >
> >> >
> >>
> https://docs.google.com/document/d/1cKBNxPkVVENly9YczYXsVDVWwrCdRq3G_cja5s2QDQg/edit?usp=sharing
> >> > >
> >> > > If all that makes sense, I'll create a full fledged KIP document and
> >> > expand
> >> > > the idea.
> >> > >
> >> > > Thanks,
> >> > > A.
> >> > >
> >> >
> >> >
> >> > --
> >> >
> >> > Ben Stopford
> >> >
> >>
> >
> >
> > --
> > Gwen Shapira
> > Engineering Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
> >
>
>
> --
> Gwen Shapira
> Engineering Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Re: [DISCUSS] KIP-516: Topic Identifiers

2020-09-11 Thread Tom Bentley
Hi Justine,

This looks like a very welcome improvement. Thanks!

Maybe I missed it, but the KIP doesn't seem to mention changing
DeleteTopicsRequest to identify the topic using an id. Maybe that's out of
scope, but DeleteTopicsRequest is not listed among the Future Work APIs
either.

Kind regards,

Tom

On Thu, Sep 10, 2020 at 3:59 PM Satish Duggana 
wrote:

> Thanks Lucas/Justine for the nice KIP.
>
> It has several benefits which also include simplifying the topic
> deletion process by controller and logs cleanup by brokers in corner
> cases.
>
> Best,
> Satish.
>
> On Wed, Sep 9, 2020 at 10:07 PM Justine Olshan 
> wrote:
> >
> > Hello all, it's been almost a year! I've made some changes to this KIP
> and hope to continue the discussion.
> >
> > One of the main changes I've added is now the metadata response will
> include the topic ID (as Colin suggested). Clients can obtain the topicID
> of a given topic through a TopicDescription. The topicId will also be
> included with the UpdateMetadata request.
> >
> > Let me know what you all think.
> > Thank you,
> > Justine
> >
> > On 2019/09/13 16:38:26, "Colin McCabe"  wrote:
> > > Hi Lucas,
> > >
> > > Thanks for tackling this.  Topic IDs are a great idea, and this is a
> really good writeup.
> > >
> > > For /brokers/topics/[topic], the schema version should be bumped to
> version 3, rather than 2.  KIP-455 bumped the version of this znode to 2
> already :)
> > >
> > > Given that we're going to be seeing these things as strings as lot (in
> logs, in ZooKeeper, on the command-line, etc.), does it make sense to use
> base64 when converting them to strings?
> > >
> > > Here is an example of the hex representation:
> > > 6fcb514b-b878-4c9d-95b7-8dc3a7ce6fd8
> > >
> > > And here is an example in base64.
> > > b8tRS7h4TJ2Vt43Dp85v2A
> > >
> > > The base64 version saves 15 letters (to be fair, 4 of those were
> dashes that we could have elided in the hex representation.)
> > >
> > > Another thing to consider is that we should specify that the
> all-zeroes UUID is not a valid topic UUID.   We can't use null for this
> because we can't pass a null UUID over the RPC protocol (there is no
> special pattern for null, nor do we want to waste space reserving such a
> pattern.)
> > >
> > > Maybe I missed it, but did you describe "migration of... existing
> topic[s] without topic IDs" in detail in any section?  It seems like when
> the new controller becomes active, it should just generate random UUIDs for
> these, and write the random UUIDs back to ZooKeeper.  It would be good to
> spell that out.  We should make it clear that this happens regardless of
> the inter-broker protocol version (it's a compatible change).
> > >
> > > "LeaderAndIsrRequests including an is_every_partition flag" seems a
> bit wordy.  Can we just call these "full LeaderAndIsrRequests"?  Then the
> RPC field could be named "full".  Also, it would probably be better for the
> RPC field to be an enum of { UNSPECIFIED, INCREMENTAL, FULL }, so that we
> can cleanly handle old versions (by treating them as UNSPECIFIED)
> > >
> > > In the LeaderAndIsrRequest section, you write "A final deletion event
> will be secheduled for X ms after the LeaderAndIsrRequest was first
> received..."  I guess the X was a placeholder that you intended to replace
> before posting? :)  In any case, this seems like the kind of thing we'd
> want a configuration for.  Let's describe that configuration key somewhere
> in this KIP, including what its default value is.
> > >
> > > We should probably also log a bunch of messages at WARN level when
> something is scheduled for deletion, as well.  (Maybe this was assumed, but
> it would be good to mention it).
> > >
> > > I feel like there are a few sections that should be moved to "rejected
> alternatives."  For example, in the DeleteTopics section, since we're not
> going to do option 1 or 2, these should be moved into "rejected
> alternatives,"  rather than appearing inline.  Another case is the "Should
> we remove topic name from the protocol where possible" section.  This is
> clearly discussing a design alternative that we're not proposing to
> implement: removing the topic name from those protocols.
> > >
> > > Is it really necessary to have a new /admin/delete_topics_by_id path
> in ZooKeeper?  It seems like we don't really need this.  Whenever there is
> a new controller, we'll send out full LeaderAndIsrRequests which will
> trigger the stale topics to be cleaned up.   The active controller will
> also send the full LeaderAndIsrRequest to brokers that are just starting
> up.So we don't really need this kind of two-phase commit (send out
> StopReplicasRequest, get ACKs from all nodes, commit by removing
> /admin/delete_topics node) any more.
> > >
> > > You mention that FetchRequest will now include UUID to avoid issues
> where requests are made to stale partitions.  However, adding a UUID to
> MetadataRequest is listed as future work, out of scope for this KIP.  How
> 

Build failed in Jenkins: Kafka » kafka-trunk-jdk15 #57

2020-09-11 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-5636: Add Sliding Windows documentation (#9264)


--
[...truncated 3.28 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 

Re: [DISCUSS] KIP-508: Make Suppression State Queriable - rebooted.

2020-09-11 Thread Dongjin Lee
Hi All,

Here is the voting thread:
https://lists.apache.org/thread.html/r5653bf2dafbb27b247bf20dbe6f070c151b3823d96c9c9ca94183e20%40%3Cdev.kafka.apache.org%3E

Thanks,
Dongjin

On Fri, Sep 11, 2020 at 4:23 PM Dongjin Lee  wrote:

> Hi John,
>
> Thanks for the feedback. I will open the Vote thread now.
>
> Best,
> Dongjin
>
> On Fri, Sep 11, 2020 at 2:00 AM John Roesler  wrote:
>
>> Hi Dongjin,
>>
>> Sorry for the delay. I'm glad you're still pushing this
>> forward. It would be nice to get this in to the 2.7 release.
>>
>> I just took another look at the KIP, and it looks good to
>> me!
>>
>> I think this is ready for a vote.
>>
>> Thanks,
>> -John
>>
>> On Wed, 2020-08-05 at 22:04 +0900, Dongjin Lee wrote:
>> > Hi All,
>> >
>> > I updated the KIP
>> > <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable
>> >
>> > and the implementation, following the discussion here.
>> >
>> > You must be working hard preparing the release of 2.6.0, so please have
>> a
>> > look after your work is done.
>> >
>> > Thanks,
>> > Dongjin
>> >
>> > On Sun, Mar 8, 2020 at 12:20 PM John Roesler 
>> wrote:
>> >
>> > > Thanks Matthias,
>> > >
>> > > Good idea. I've changed the ticket name and added a note
>> > > clarifying that this ticket is not the same as
>> > > https://issues.apache.org/jira/browse/KAFKA-7224
>> > >
>> > > Incidentally, I learned that I never documented my reasons
>> > > for abandoning my work on KAFKA-7224 ! I've now updated
>> > > that ticket, too, so your question had an unexpected side-benefit.
>> > >
>> > > Thanks,
>> > > -John
>> > >
>> > > On Sat, Mar 7, 2020, at 18:01, Matthias J. Sax wrote:
>> > > > -BEGIN PGP SIGNED MESSAGE-
>> > > > Hash: SHA512
>> > > >
>> > > > Thanks for clarification.
>> > > >
>> > > > Can you maybe update the Jira ticket? Do we have a ticket for
>> > > > spill-to-disk? Maybe link to it and explain that it's two different
>> > > > things? Maybe even rename the ticket to something more clear, ie,
>> > > > "make suppress result queryable" or simliar?
>> > > >
>> > > >
>> > > > - -Matthias
>> > > >
>> > > > On 3/7/20 1:58 PM, John Roesler wrote:
>> > > > > Hey Matthias,
>> > > > >
>> > > > > I’m sorry if the ticket was poorly stated. The ticket is to add a
>> > > > DSL overload to pass a Materialized argument to suppress. As a
>> result,
>> > > > the result of the suppression would be queriable.
>> > > > > This is unrelated to “persistent buffer” aka “spill-to-disk”.
>> > > > >
>> > > > > There was some confusion before about whether this ticket could be
>> > > > implemented as “query the buffer”. Maybe it can, but not trivially.
>> > > > The obvious way is just to add a new state store which we write the
>> > > > results into just before we forward. I.e., it’s exactly like the
>> > > > materialized variant of any stateless KTable operation.
>> > > > > Thanks, John
>> > > > >
>> > > > > On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote: Thanks for
>> > > > > the KIP Dongjin,
>> > > > >
>> > > > > I am still not sure if I can follow, what might also be caused by
>> > > > > the backing JIRA ticket (maybe John can clarify the intent of the
>> > > > > ticket as he created it):
>> > > > >
>> > > > > Currently, suppress() only uses an in-memory buffer and my
>> > > > > understanding of the Jira is, to add the ability to use a
>> > > > > persistent buffer (ie, spill to disk backed by RocksDB).
>> > > > >
>> > > > > Adding a persistent buffer is completely unrelated to allow
>> > > > > querying the buffer. In fact, one could query an in-memory buffer,
>> > > > > too. However, querying the buffer does not really seem to be
>> useful
>> > > > > as pointed out by John, as you can always query the upstream
>> KTable
>> > > > > store.
>> > > > >
>> > > > > Also note that for the emit-on-window-close case the result is
>> > > > > deleted from the buffer when it is emitted, and thus cannot be
>> > > > > queried any longe r.
>> > > > >
>> > > > >
>> > > > > Can you please clarify if you intend to allow spilling to disk or
>> > > > > if you intent to enable IQ (even if I don't see why querying make
>> > > > > sense, as the data is either upstream or deleted). Also, if you
>> > > > > want to enable IQ, why do we need all those new interfaces? The
>> > > > > result of a suppress() is a KTable that is the same as any other
>> > > > > key-value/windowed/sessions store?
>> > > > >
>> > > > > We should also have corresponding Jira tickets for different cases
>> > > > > to avoid the confusion I am in atm :)
>> > > > >
>> > > > >
>> > > > > -Matthias
>> > > > >
>> > > > >
>> > > > > On 2/27/20 8:21 AM, John Roesler wrote:
>> > > > > > > > Hi Dongjin,
>> > > > > > > >
>> > > > > > > > No problem; glad we got it sorted out.
>> > > > > > > >
>> > > > > > > > Thanks again for picking this up! -John
>> > > > > > > >
>> > > > > > > > On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote:
>> > > > > > > > > > I was under the impression that you wanted to 

[VOTE] KIP-508: Make Suppression State Queriable

2020-09-11 Thread Dongjin Lee
Hi devs,

I hope to start the vote for KIP-508: Make Suppression State Queriable - it
makes suppression state queriable by introducing KTable#suppress(Suppressed, Materialized) to the public API.

Thanks,
Dongjin

-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*




*github:  github.com/dongjinleekr
keybase: https://keybase.io/dongjinleekr
linkedin: kr.linkedin.com/in/dongjinleekr
speakerdeck: speakerdeck.com/dongjin
*


Re: [DISCUSS] KIP-508: Make Suppression State Queriable - rebooted.

2020-09-11 Thread Dongjin Lee
Hi John,

Thanks for the feedback. I will open the Vote thread now.

Best,
Dongjin

On Fri, Sep 11, 2020 at 2:00 AM John Roesler  wrote:

> Hi Dongjin,
>
> Sorry for the delay. I'm glad you're still pushing this
> forward. It would be nice to get this in to the 2.7 release.
>
> I just took another look at the KIP, and it looks good to
> me!
>
> I think this is ready for a vote.
>
> Thanks,
> -John
>
> On Wed, 2020-08-05 at 22:04 +0900, Dongjin Lee wrote:
> > Hi All,
> >
> > I updated the KIP
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable
> >
> > and the implementation, following the discussion here.
> >
> > You must be working hard preparing the release of 2.6.0, so please have a
> > look after your work is done.
> >
> > Thanks,
> > Dongjin
> >
> > On Sun, Mar 8, 2020 at 12:20 PM John Roesler 
> wrote:
> >
> > > Thanks Matthias,
> > >
> > > Good idea. I've changed the ticket name and added a note
> > > clarifying that this ticket is not the same as
> > > https://issues.apache.org/jira/browse/KAFKA-7224
> > >
> > > Incidentally, I learned that I never documented my reasons
> > > for abandoning my work on KAFKA-7224 ! I've now updated
> > > that ticket, too, so your question had an unexpected side-benefit.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Sat, Mar 7, 2020, at 18:01, Matthias J. Sax wrote:
> > > > -BEGIN PGP SIGNED MESSAGE-
> > > > Hash: SHA512
> > > >
> > > > Thanks for clarification.
> > > >
> > > > Can you maybe update the Jira ticket? Do we have a ticket for
> > > > spill-to-disk? Maybe link to it and explain that it's two different
> > > > things? Maybe even rename the ticket to something more clear, ie,
> > > > "make suppress result queryable" or simliar?
> > > >
> > > >
> > > > - -Matthias
> > > >
> > > > On 3/7/20 1:58 PM, John Roesler wrote:
> > > > > Hey Matthias,
> > > > >
> > > > > I’m sorry if the ticket was poorly stated. The ticket is to add a
> > > > DSL overload to pass a Materialized argument to suppress. As a
> result,
> > > > the result of the suppression would be queriable.
> > > > > This is unrelated to “persistent buffer” aka “spill-to-disk”.
> > > > >
> > > > > There was some confusion before about whether this ticket could be
> > > > implemented as “query the buffer”. Maybe it can, but not trivially.
> > > > The obvious way is just to add a new state store which we write the
> > > > results into just before we forward. I.e., it’s exactly like the
> > > > materialized variant of any stateless KTable operation.
> > > > > Thanks, John
> > > > >
> > > > > On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote: Thanks for
> > > > > the KIP Dongjin,
> > > > >
> > > > > I am still not sure if I can follow, what might also be caused by
> > > > > the backing JIRA ticket (maybe John can clarify the intent of the
> > > > > ticket as he created it):
> > > > >
> > > > > Currently, suppress() only uses an in-memory buffer and my
> > > > > understanding of the Jira is, to add the ability to use a
> > > > > persistent buffer (ie, spill to disk backed by RocksDB).
> > > > >
> > > > > Adding a persistent buffer is completely unrelated to allow
> > > > > querying the buffer. In fact, one could query an in-memory buffer,
> > > > > too. However, querying the buffer does not really seem to be useful
> > > > > as pointed out by John, as you can always query the upstream KTable
> > > > > store.
> > > > >
> > > > > Also note that for the emit-on-window-close case the result is
> > > > > deleted from the buffer when it is emitted, and thus cannot be
> > > > > queried any longe r.
> > > > >
> > > > >
> > > > > Can you please clarify if you intend to allow spilling to disk or
> > > > > if you intent to enable IQ (even if I don't see why querying make
> > > > > sense, as the data is either upstream or deleted). Also, if you
> > > > > want to enable IQ, why do we need all those new interfaces? The
> > > > > result of a suppress() is a KTable that is the same as any other
> > > > > key-value/windowed/sessions store?
> > > > >
> > > > > We should also have corresponding Jira tickets for different cases
> > > > > to avoid the confusion I am in atm :)
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 2/27/20 8:21 AM, John Roesler wrote:
> > > > > > > > Hi Dongjin,
> > > > > > > >
> > > > > > > > No problem; glad we got it sorted out.
> > > > > > > >
> > > > > > > > Thanks again for picking this up! -John
> > > > > > > >
> > > > > > > > On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote:
> > > > > > > > > > I was under the impression that you wanted to expand the
> > > > > > > > > > scope of the KIP
> > > > > > > > > to additionally allow querying the internal buffer, not
> > > > > > > > > just the result. Can you clarify whether you are proposing
> > > > > > > > > to allow querying the state of the internal buffer, the
> > > > > > > > > result, or both?
> > > > > > > > >
> > > > > > > > > Sorry for the confusion. As we 

Re: [DISCUSS] KIP-656: MirrorMaker2 Exactly-once Semantics

2020-09-11 Thread Ning Zhang
Bump up for another round of discussion.

To follow up last question raised about running Source (MirrorCheckpoint) 
Connector and Sink (MirrorSinkConnector) Connector simultaneously in MM2. From 
my testing, they all are functioning well and there seems no significant 
degradation or complexity of mixing Source and Sink Connector.

If there are some reasons that we have to "fork" (e.g. duplicate most codebase) 
MirrorCheckpoint as Sink Connector, I am happy to look into the necessity in 
another round.

Thanks

On 2020/09/03 15:06:00, Mickael Maison  wrote: 
> Hi Ning,
> 
> Thanks for the updates.
> 
> 1. If you have to run a Sink (the new MirrorSinkConnector) and Source
> (MirrorCheckpoint) connector for MM2 you will need 2 Connect runtimes.
> So this does not work well for users of Connect. I've not really
> looked into it yet but I wonder if we should include a Sink connector
> for checkpoints too
> 
> On Thu, Sep 3, 2020 at 6:51 AM Ning Zhang  wrote:
> >
> > bump for another potential more discussion
> >
> > On 2020/08/27 23:31:38, Ning Zhang  wrote:
> > > Hello Mickael,
> > >
> > > > 1. How does offset translation work with this new sink connector?
> > > > Should we also include a CheckpointSinkConnector?
> > >
> > > CheckpointSourceConnector will be re-used as the same as current. When 
> > > EOS is enabled, we will run 3 connectors:
> > >
> > > MirrorSinkConnector (based on SinkConnector)
> > > MirrorCheckpointConnector (based on SourceConnector)
> > > MirrorHeartbeatConnector (based on SourceConnector)
> > >
> > > For the last two connectors (checkpoint, heartbeat), if we do not 
> > > strictly require EOS, it is probably OK to use current implementation on 
> > > SourceConnector.
> > >
> > > I will update the KIP to clarify this, if it sounds acceptable.
> > >
> > > > 2. Migrating to this new connector could be tricky as effectively the
> > > > Connect runtime needs to point to the other cluster, so its state
> > > > (stored in the __connect topics) is lost. Unfortunately there is no
> > > > easy way today to prime Connect with offsets. Not necessarily a
> > > > blocking issue but this should be described as I think the current
> > > > Migration section looks really optimistic at the moment
> > >
> > > totally agree. I will update the migration part with notes about 
> > > potential service interruption, without careful planning.
> > >
> > > > 3. We can probably find a better name than "transaction.producer".
> > > > Maybe we can follow a similar pattern than Streams (which uses
> > > > "processing.guarantee")?
> > >
> > > "processing.guarantee" sounds better
> > >
> > > > 4. Transactional Ids used by the producer are generated based on the
> > > > task assignments. If there's a single task, if it crashes and restarts
> > > > it would still get the same id. Can this be an issue?
> > >
> > > From https://tgrez.github.io/posts/2019-04-13-kafka-transactions.html, 
> > > the author suggests to postfix transaction.id with :
> > >
> > > "To avoid handling an external store we will use a static encoding 
> > > similarly as in spring-kafka:
> > > The transactional.id is now the transactionIdPrefix appended with 
> > > ..."
> > >
> > > I think as long as there is no more than one producer use same 
> > > "transaction.id" at the same time, it is OK.
> > >
> > > Also from my tests, this "transaction.id" assignment works fine with 
> > > failures. To tighten it up, I also tested to use  "connector task id" in 
> > > "transaction.id". The "connector task id" is typically composed of 
> > > connector_name and task_id, which is also unique across all connectors in 
> > > a KC cluster.
> > >
> > >  > 5. The logic in the KIP creates a new transaction every time put() is
> > > > called. Is there a performance impact?
> > >
> > > It could be a performance hit if the transaction batch is too small under 
> > > high ingestion rate. The batch size depends on how many messages that 
> > > consumer poll each time. Maybe we could increase "max.poll.records" to 
> > > have larger batch size.
> > >
> > > Overall, thanks so much for the valuable feedback. If the responses 
> > > sounds good, I will do a cleanup of KIP.
> > >
> > > On 2020/08/27 09:59:57, Mickael Maison  wrote:
> > > > Thanks Ning for the KIP. Having stronger guarantees when mirroring
> > > > data would be a nice improvement!
> > > >
> > > > A few comments:
> > > > 1. How does offset translation work with this new sink connector?
> > > > Should we also include a CheckpointSinkConnector?
> > > >
> > > > 2. Migrating to this new connector could be tricky as effectively the
> > > > Connect runtime needs to point to the other cluster, so its state
> > > > (stored in the __connect topics) is lost. Unfortunately there is no
> > > > easy way today to prime Connect with offsets. Not necessarily a
> > > > blocking issue but this should be described as I think the current
> > > > Migration section looks really optimistic at the moment
> > > >
> > > > 3. We can