[jira] [Resolved] (KAFKA-15659) Flaky test RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener

2023-10-27 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze resolved KAFKA-15659.
---
Resolution: Fixed

> Flaky test 
> RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener 
> 
>
> Key: KAFKA-15659
> URL: https://issues.apache.org/jira/browse/KAFKA-15659
> Project: Kafka
>  Issue Type: Test
>Reporter: Divij Vaidya
>    Assignee: Levani Kokhreidze
>Priority: Major
>  Labels: flaky-test, streams
> Attachments: Screenshot 2023-10-20 at 13.19.20.png
>
>
> The test added in the PR [https://github.com/apache/kafka/pull/14519] 
> {{shouldInvokeUserDefinedGlobalStateRestoreListener}} has been flaky since it 
> was added. You can find the flaky build on trunk using the link 
> [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=trunk=Europe%2FBerlin=org.apache.kafka.streams.integration.RestoreIntegrationTest=shouldInvokeUserDefinedGlobalStateRestoreListener()]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] 3.5.2 Release

2023-10-12 Thread Levani Kokhreidze
Hi Divij,

Thanks for the explanation, makes sense.

Hi Luke, thanks you! It would be awesome to see 3.5.2 out.

Best,
Levani

> On 12. Oct 2023, at 12:39, Luke Chen  wrote:
> 
> Hi Levani and Divij,
> 
> I can work on the 3.5.2 release.
> I'll start a new thread for volunteering it maybe next week.
> 
> Thanks.
> Luke
> 
> On Thu, Oct 12, 2023 at 5:07 PM Divij Vaidya 
> wrote:
> 
>> Hello Levani
>> 
>> From a process perspective, there is no fixed schedule for bug fix
>> releases. If we have a volunteer for release manager (must be a committer),
>> they can start with the process of bug fix release (with the approval of
>> PMC).
>> 
>> My personal opinion is that it's too early to start 3.6.1 and we should
>> wait at least 1 months to hear feedback on 3.6.0. We need to make a careful
>> balance between getting the critical fixes in the hands of users as soon
>> as possible vs. spending community effort towards releases (the effort that
>> could be used to make Kafka better, feature-wise & operational
>> stability-wise, otherwise).
>> 
>> For 3.5.2, I think there are sufficient pending (including some CVE fixes)
>> to start a bug fix release. We just need a volunteer for the release
>> manager.
>> 
>> --
>> Divij Vaidya
>> 
>> 
>> 
>> On Thu, Oct 12, 2023 at 9:57 AM Levani Kokhreidze 
>> wrote:
>> 
>>> Hello,
>>> 
>>> KAFKA-15571 [1] was merged and backported to the 3.5 and 3.6 branches.
>> Bug
>>> fixes the feature that was added in 3.5. Considering the feature doesn't
>>> work as expected without a fix, I would like to know if it's reasonable
>> to
>>> start the 3.5.2 release. Of course, releasing such a massive project like
>>> Kafka is not a trivial task, and I am looking for the community's input
>> on
>>> this if it's reasonable to start the 3.5.2 release process.
>>> 
>>> Best,
>>> Levani
>>> 
>>> [1] - https://issues.apache.org/jira/browse/KAFKA-15571
>> 



[DISCUSS] 3.5.2 Release

2023-10-12 Thread Levani Kokhreidze
Hello,

KAFKA-15571 [1] was merged and backported to the 3.5 and 3.6 branches. Bug 
fixes the feature that was added in 3.5. Considering the feature doesn't work 
as expected without a fix, I would like to know if it's reasonable to start the 
3.5.2 release. Of course, releasing such a massive project like Kafka is not a 
trivial task, and I am looking for the community's input on this if it's 
reasonable to start the 3.5.2 release process.

Best,
Levani

[1] - https://issues.apache.org/jira/browse/KAFKA-15571

Re: KAFKA-15571 Review

2023-10-10 Thread Levani Kokhreidze
Hi Bruno,

Thanks for taking a look. I’ve added a test as well.

Is it fair to say that we can expect 3.5.2 release with this fix? I can cherry 
pick the changes into the 3.5 branch.

Best,
Levani

> On 10. Oct 2023, at 16:35, Bruno Cadonna  wrote:
> 
> Hi Levani,
> 
> I think you found a bug and you are looking at the right place!
> 
> I commented on the PR.
> 
> Best,
> Bruno
> 
> On 10/10/23 3:02 PM, Levani Kokhreidze wrote:
>> Hello,
>> We’ve been looking at a https://issues.apache.org/jira/browse/KAFKA-10575 
>> but seems implementation has a bug and user defined 
>> `StateRestoreListener#onRestoreSuspended` is not called because 
>> DelegatingStateRestoreListener was not updated.
>> Here’s the PR that fixes it: https://github.com/apache/kafka/pull/14519
>> I will add the tests, but first wanted to make sure I’m looking at a correct 
>> place and if I’m missing something.
>> Issue link: https://issues.apache.org/jira/browse/KAFKA-15571
>> Best,
>> Levani



KAFKA-15571 Review

2023-10-10 Thread Levani Kokhreidze
Hello,

We’ve been looking at a https://issues.apache.org/jira/browse/KAFKA-10575 but 
seems implementation has a bug and user defined 
`StateRestoreListener#onRestoreSuspended` is not called because 
DelegatingStateRestoreListener was not updated.

Here’s the PR that fixes it: https://github.com/apache/kafka/pull/14519

I will add the tests, but first wanted to make sure I’m looking at a correct 
place and if I’m missing something.

Issue link: https://issues.apache.org/jira/browse/KAFKA-15571

Best,
Levani

[jira] [Created] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended

2023-10-10 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-15571:
-

 Summary: StateRestoreListener#onRestoreSuspended is never called 
because wrapper DelegatingStateRestoreListener doesn't implement 
onRestoreSuspended
 Key: KAFKA-15571
 URL: https://issues.apache.org/jira/browse/KAFKA-15571
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.5.1, 3.5.0
Reporter: Levani Kokhreidze


With https://issues.apache.org/jira/browse/KAFKA-10575 
`StateRestoreListener#onRestoreSuspended` was added. But local tests show that 
it is never called because `DelegatingStateRestoreListener` was not updated to 
call a new method



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-02 Thread Levani Kokhreidze
Congrats Bruno, well deserved!

On Wed, 2 Nov 2022, 11:11 Jorge Esteban Quilcate Otoya, <
quilcate.jo...@gmail.com> wrote:

> Congratulations, Bruno!!
>
> On Wed, 2 Nov 2022, 09:06 Mickael Maison, 
> wrote:
>
> > Congratulations Bruno!
> >
> > On Wed, Nov 2, 2022 at 8:33 AM Matthew Benedict de Detrich
> >  wrote:
> > >
> > > Congratulations!
> > >
> > > On Wed, Nov 2, 2022 at 8:32 AM Josep Prat  >
> > > wrote:
> > >
> > > > Congrats Bruno!
> > > >
> > > > ———
> > > > Josep Prat
> > > >
> > > > Aiven Deutschland GmbH
> > > >
> > > > Immanuelkirchstraße 26, 10405 Berlin
> > > >
> > > > Amtsgericht Charlottenburg, HRB 209739 B
> > > >
> > > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > > >
> > > > m: +491715557497
> > > >
> > > > w: aiven.io
> > > >
> > > > e: josep.p...@aiven.io
> > > >
> > > > On Wed, Nov 2, 2022, 08:20 Tom Bentley  wrote:
> > > >
> > > > > Congratulations!
> > > > >
> > > > > On Wed, 2 Nov 2022 at 06:40, David Jacot 
> > wrote:
> > > > >
> > > > > > Congrats, Bruno! Well deserved.
> > > > > >
> > > > > > Le mer. 2 nov. 2022 à 06:12, Randall Hauch  a
> > écrit
> > > > :
> > > > > >
> > > > > > > Congratulations, Bruno!
> > > > > > >
> > > > > > > On Tue, Nov 1, 2022 at 11:20 PM Sagar <
> sagarmeansoc...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Congrats Bruno!
> > > > > > > >
> > > > > > > > Sagar.
> > > > > > > >
> > > > > > > > On Wed, Nov 2, 2022 at 7:51 AM deng ziming <
> > > > dengziming1...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Congrats!
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Ziming
> > > > > > > > >
> > > > > > > > > > On Nov 2, 2022, at 3:36 AM, Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi everyone,
> > > > > > > > > >
> > > > > > > > > > I'd like to introduce our new Kafka PMC member, Bruno.
> > > > > > > > > >
> > > > > > > > > > Bruno has been a committer since April. 2021 and has been
> > very
> > > > > > active
> > > > > > > > in
> > > > > > > > > > the community. He's a key contributor to Kafka Streams,
> and
> > > > also
> > > > > > > helped
> > > > > > > > > > review a lot of horizontal improvements such as Mockito.
> > It is
> > > > my
> > > > > > > > > pleasure
> > > > > > > > > > to announce that Bruno has agreed to join the Kafka PMC.
> > > > > > > > > >
> > > > > > > > > > Congratulations, Bruno!
> > > > > > > > > >
> > > > > > > > > > -- Guozhang Wang, on behalf of Apache Kafka PMC
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Matthew de Detrich
> > >
> > > *Aiven Deutschland GmbH*
> > >
> > > Immanuelkirchstraße 26, 10405 Berlin
> > >
> > > Amtsgericht Charlottenburg, HRB 209739 B
> > >
> > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > >
> > > *m:* +491603708037
> > >
> > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io
> >
>


Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2022-05-23 Thread Levani Kokhreidze
Hi all,

Since there was no activity around this KIP, I’ll pick it up in coming weeks 
and continue the discussion.

Best,
Levani

> On 27. Apr 2022, at 22:50, Matthias J. Sax  wrote:
> 
> Let's wait a couple of days to give Ivan a chance to reply. If he does not 
> reply, feel free to pick it up.
> 
> 
> -Matthias
> 
> On 4/26/22 3:58 AM, Levani Kokhreidze wrote:
>> Hi,
>> Sorry, maybe I am jumping the gun here, but if by any chance this KIP 
>> becomes dormant, I'd be interested in picking it up.
>> Levani
>>> On 23. Apr 2022, at 02:43, Matthias J. Sax  wrote:
>>> 
>>> Ivan,
>>> 
>>> are you still interested in this KIP? I think it would be a good addition.
>>> 
>>> 
>>> -Matthias
>>> 
>>> On 8/16/21 5:30 PM, Matthias J. Sax wrote:
>>>> Your point about the IQ problem is an interesting one. I missed the
>>>> point that the "new key" would be a "superkey", and thus, it should
>>>> always be possible to compute the original key from the superkey. (As a
>>>> matter of fact, for windowed-table the windowed-key is also a superkey...)
>>>> I am not sure if we need to follow the "use the head idea" or if we need
>>>> a "CompositeKey" interface? It seems we can just allow for any types and
>>>> we can be agnostic to it?
>>>> KStream stream = ...
>>>> KStream stream2 =
>>>>   stream.selectKey(/*set superkey*/)
>>>> .markAsPartitioned()
>>>> We only need a `Function` without any restrictions on the type,
>>>> to map the "superkey" to the original "partition key"?
>>>> Do you propose to provide the "revers mapper" via the
>>>> `markAsPartitioned()` method (or config object), or via the IQ methods?
>>>> Not sure which one is better?
>>>> However, I am not sure if it would solve the join problem? At least not
>>>> easily: if one has two KStream and one is properly
>>>> partitioned by `Tuple` while the other one is "marked-as-partitoned",
>>>> the join would just fail. -- Similar for a stream-table join. -- The
>>>> only fix would be to do the re-partitioning anyway, effectively ignoring
>>>> the "user hint", but it seems to defeat the purpose? Again, I would
>>>> argue that it is ok to not handle this case, but leave it as the
>>>> responsibility for the user to not mess it up.
>>>> -Matthias
>>>> On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
>>>>> Hi Matthias and Sophie!
>>>>> 
>>>>> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
>>>>> 
>>>>> I don't have a strong opinion here, both Sophie's and Matthias' points
>>>>> look convincing for me.
>>>>> 
>>>>> I think we should estimate the following: what is the probability that
>>>>> we will ever need to extend `selectKey` etc. with a config for the
>>>>> purposes other than `markAsPartitioned`?
>>>>> 
>>>>> If we find this probability high, then it's just a refactoring to
>>>>> deprecate overloads with `Named` and introduce overloads with dedicated
>>>>> configs, and we should do it this way.
>>>>> 
>>>>> If it's low or zero, maybe it's better not to mess with the existing
>>>>> APIs and to introduce a single `markAsPartitioned()` method, which
>>>>> itself can be easily deprecated if we find a better solution later!
>>>>> 
>>>>> 
>>>>> ==2. The IQ problem==
>>>>> 
>>>>>> it then has to be the case that
>>>>> 
>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>> 
>>>>> 
>>>>> Sophie, you got this wrong, and Matthias already explained why.
>>>>> 
>>>>> The actual required property for the mapping function is:
>>>>> 
>>>>> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
>>>>> 
>>>>> or, by contraposition law,
>>>>> 
>>>>> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
>>>>> 
>>>>> 
>>>>> (look at the whiteboard photo that I attached to the KIP).
>>>>> 
>>>>> There is a big class of such mappings: key -> Tuple(key, an

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2022-04-26 Thread Levani Kokhreidze
Hi,

Sorry, maybe I am jumping the gun here, but if by any chance this KIP becomes 
dormant, I'd be interested in picking it up.

Levani

> On 23. Apr 2022, at 02:43, Matthias J. Sax  wrote:
> 
> Ivan,
> 
> are you still interested in this KIP? I think it would be a good addition.
> 
> 
> -Matthias
> 
> On 8/16/21 5:30 PM, Matthias J. Sax wrote:
>> Your point about the IQ problem is an interesting one. I missed the
>> point that the "new key" would be a "superkey", and thus, it should
>> always be possible to compute the original key from the superkey. (As a
>> matter of fact, for windowed-table the windowed-key is also a superkey...)
>> I am not sure if we need to follow the "use the head idea" or if we need
>> a "CompositeKey" interface? It seems we can just allow for any types and
>> we can be agnostic to it?
>> KStream stream = ...
>> KStream stream2 =
>>   stream.selectKey(/*set superkey*/)
>> .markAsPartitioned()
>> We only need a `Function` without any restrictions on the type,
>> to map the "superkey" to the original "partition key"?
>> Do you propose to provide the "revers mapper" via the
>> `markAsPartitioned()` method (or config object), or via the IQ methods?
>> Not sure which one is better?
>> However, I am not sure if it would solve the join problem? At least not
>> easily: if one has two KStream and one is properly
>> partitioned by `Tuple` while the other one is "marked-as-partitoned",
>> the join would just fail. -- Similar for a stream-table join. -- The
>> only fix would be to do the re-partitioning anyway, effectively ignoring
>> the "user hint", but it seems to defeat the purpose? Again, I would
>> argue that it is ok to not handle this case, but leave it as the
>> responsibility for the user to not mess it up.
>> -Matthias
>> On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
>>> Hi Matthias and Sophie!
>>> 
>>> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
>>> 
>>> I don't have a strong opinion here, both Sophie's and Matthias' points
>>> look convincing for me.
>>> 
>>> I think we should estimate the following: what is the probability that
>>> we will ever need to extend `selectKey` etc. with a config for the
>>> purposes other than `markAsPartitioned`?
>>> 
>>> If we find this probability high, then it's just a refactoring to
>>> deprecate overloads with `Named` and introduce overloads with dedicated
>>> configs, and we should do it this way.
>>> 
>>> If it's low or zero, maybe it's better not to mess with the existing
>>> APIs and to introduce a single `markAsPartitioned()` method, which
>>> itself can be easily deprecated if we find a better solution later!
>>> 
>>> 
>>> ==2. The IQ problem==
>>> 
 it then has to be the case that
>>> 
 Partitioner.partition(key) == Partitioner.partition(map(key))
>>> 
>>> 
>>> Sophie, you got this wrong, and Matthias already explained why.
>>> 
>>> The actual required property for the mapping function is:
>>> 
>>> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
>>> 
>>> or, by contraposition law,
>>> 
>>> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
>>> 
>>> 
>>> (look at the whiteboard photo that I attached to the KIP).
>>> 
>>> There is a big class of such mappings: key -> Tuple(key, anyValue). This
>>> is actually what we often do before aggregation, and this mapping does
>>> not require repartition.
>>> 
>>> But of course we can extract the original key from Tuple(key, anyValue),
>>> and this can save IQ and joins!
>>> 
>>> This is what I'm talking about when I talk about 'CompositeKey' idea.
>>> 
>>> We can do the following:
>>> 
>>> 1. implement a 'partitioner wrapper' that recognizes tuples
>>> (CompositeKeys) and uses only the 'head' to calculate the partition,
>>> 
>>> 2. implement
>>> 
>>> selectCompositeKey(BiFunction tailSelector) {
>>>   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>>>   //MARK_AS_PARTITIONED call here,
>>>   //but this call is an implementation detail and we do not expose
>>>   //markAsPartitioned publicly!
>>> }
>>> 
>>> WDYT? (it's just a brainstorming idea)
>>> 
>>> 09.08.2021 2:38, Matthias J. Sax пишет:
 Hi,
 
 I originally had a similar thought about `markAsPartitioned()` vs
 extending `selectKey()` et al. with a config. While I agree that it
 might be conceptually cleaner to use a config object, I did not propose
 it as the API impact (deprecating stuff and adding new stuff) is quite
 big... If we think it's an acceptable price to pay, I am ok with it
 though.
 
 I also do think, that `markAsPartitioned()` could actually be
 categorized as an operator... We don't expose it in the API as
 first-class citizen atm, but in fact we have two types of `KStream` -- a
 "PartitionedKStream" and a "NonPartitionedKStream". Thus,
 `markAsPartitioned()` can be seen as a "cast operator" that converts the
 one into the other.
 
 I also think that the 

Re: [DISCUSS] Apache Kafka 3.2.0 release

2022-03-15 Thread Levani Kokhreidze
Hi Bruno,

I assume we will be able to merge changes tomorrow as well?

Thanks,
Levani

> On 15. Mar 2022, at 16:11, Bruno Cadonna  wrote:
> 
> Hi all,
> 
> A quick reminder that feature freeze for Apache 3.2.0 is tomorrow. Please 
> make sure to get your features merged into trunk.
> 
> I will cut the release branch on Monday.
> 
> Best,
> Bruno
> 
> On 07.03.22 15:03, Bruno Cadonna wrote:
>> Hi Kafkateers,
>> Last week we reached KIP freeze for the next major release 3.2.0 of Apache 
>> Kafka.
>> I have updated the release plan for AK 3.2.0 with all the KIPs that passed 
>> the vote last week.
>> Please, verify the plan and let me know if any KIP should be added
>> to or removed from the release plan.
>> For the KIPs which are still in progress, please work closely with your
>> reviewers to make sure that they land on time for the feature freeze.
>> The next milestone for the AK 3.2.0 release is feature freeze on March 16th 
>> 2022.
>> Best,
>> Bruno
>> On 01.03.22 17:41, Bruno Cadonna wrote:
>>> Hi all,
>>> 
>>> A quick reminder that KIP freeze for the Apache 3.2.0 is tomorrow. Please 
>>> make sure to close your votes if you want to add a KIP to the release plan.
>>> 
>>> Best,
>>> Bruno
>>> 
>>> On 15.02.22 12:37, Bruno Cadonna wrote:
 Hi all,
 
 I published a release plan for the Apache Kafka 3.2.0 release here:
 https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.2.0
 
 KIP Freeze: 2 March 2022
 Feature Freeze: 16 March 2022
 Code Freeze:30 March 2022
 
 At least two weeks of stabilization will follow Code Freeze.
 
 Please let me know if should add or remove KIPs from the plan or if you 
 have any other objections.
 
 Best,
 Bruno
 
 
 On 04.02.22 16:03, Bruno Cadonna wrote:
> Hi,
> 
> I'd like to volunteer to be the release manager for our next
> feature release, 3.2.0. If there are no objections, I'll send
> out the release plan soon.
> 
> Best,
> Bruno



Re: [DISCUSS] Apache Kafka 3.0.0 release plan with new updated dates

2021-07-01 Thread Levani Kokhreidze
Hi Konstantine,

FYI, I don’t think we will be able to have KIP-708 ready on time.
Feel free to remove it from the release plan.

Best,
Levani

> On 1. Jul 2021, at 01:27, Konstantine Karantasis 
>  wrote:
> 
> Hi all,
> 
> Today we have reached the Feature Freeze milestone for Apache Kafka 3.0.
> Exciting!
> 
> I'm going to allow for any pending changes to settle within the next couple
> of days.
> I trust that we all approve and merge adopted features and changes which we
> consider to be in good shape for 3.0.
> 
> Given the 4th of July holiday in the US, the 3.0 release branch will appear
> sometime on Tuesday, July 6th.
> Until then, please keep merging to trunk only the changes you intend to
> include in Apache Kafka 3.0.
> 
> Regards,
> Konstantine
> 
> 
> On Wed, Jun 30, 2021 at 3:25 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
> 
>> 
>> Done. Thanks Luke!
>> 
>> On Tue, Jun 29, 2021 at 6:39 PM Luke Chen  wrote:
>> 
>>> Hi Konstantine,
>>> We've decided that the KIP-726 will be released in V3.1, not V3.0.
>>> KIP-726: Make the "cooperative-sticky, range" as the default assignor
>>> 
>>> Could you please remove this KIP from the 3.0 release plan wiki page?
>>> 
>>> Thank you.
>>> Luke
>>> 
>>> On Wed, Jun 30, 2021 at 8:23 AM Konstantine Karantasis
>>>  wrote:
>>> 
 Thanks for the update Colin.
 They are now both in the release plan.
 
 Best,
 Konstantine
 
 On Tue, Jun 29, 2021 at 2:55 PM Colin McCabe 
>>> wrote:
 
> Hi Konstantine,
> 
> Can you please add two KIPs to the 3.0 release plan wiki page?
> 
> I'm thinking of:
>KIP-630: Kafka Raft Snapshots
>KIP-746: Revise KRaft Metadata Records
> 
> These are marked as 3.0 on the KIP page but I guess we don't have
>>> them on
> the page yet.
> 
> Many thanks.
> Colin
> 
> 
> On Tue, Jun 22, 2021, at 06:29, Josep Prat wrote:
>> Hi there,
>> 
>> As the feature freeze date is approaching, I just wanted to kindly
>>> ask
> for
>> some reviews on the already submitted PR (
>> https://github.com/apache/kafka/pull/10840) that implements the
 approved
>> KIP-744 (https://cwiki.apache.org/confluence/x/XIrOCg). The PR has
 been
>> ready for review for 2 weeks, and I simply want to make sure there
>>> is
>> enough time to address any possible changes that might be requested.
>> 
>> Thanks in advance and sorry for any inconvenience caused,
>> --
>> Josep
>> On Mon, Jun 21, 2021 at 11:54 PM Konstantine Karantasis
>>  wrote:
>> 
>>> Thanks for the update Bruno.
>>> I've moved KIP-698 to the list of postponed KIPs in the plan.
>>> 
>>> Konstantine
>>> 
>>> On Mon, Jun 21, 2021 at 2:30 AM Bruno Cadonna >>> 
> wrote:
>>> 
 Hi Konstantine,
 
 The implementation of
 
 KIP-698: Add Explicit User Initialization of Broker-side State
>>> to
> Kafka
 Streams
 
 will not be ready for 3.0, so you can remove it from the list.
 
 Best,
 Bruno
 
 On 15.06.21 07:33, Konstantine Karantasis wrote:
> Done. Moved it into the table of Adopted KIPs targeting 3.0.0
>>> and
> to
>>> the
> release plan of course.
> Thanks for catching this Israel.
> 
> Best,
> Konstantine
> 
> On Mon, Jun 14, 2021 at 7:40 PM Israel Ekpo <
 israele...@gmail.com>
 wrote:
> 
>> Konstantine,
>> 
>> One of mine is missing from this list
>> 
>> KIP-633: Drop 24 hour default of grace period in Streams
>> Please could you include it?
>> 
>> Voting has already concluded a long time ago
>> 
>> 
>> 
>> On Mon, Jun 14, 2021 at 6:08 PM Konstantine Karantasis
>>  wrote:
>> 
>>> Hi all.
>>> 
>>> KIP Freeze for the next major release of Apache Kafka was
 reached
>>> last
>>> week.
>>> 
>>> As of now, 36 KIPs have concluded their voting process and
>>> have
> been
>>> adopted.
>>> These KIPs are targeting 3.0 (unless it's noted otherwise in
 the
 release
>>> plan) and their inclusion as new features will be finalized
 right
>>> after
>>> Feature Freeze.
>>> 
>>> At the high level, out of these 36 KIPs, 11 have been
 implemented
 already
>>> and 25 are open or in progress.
>>> Here is the full list of adopted KIPs:
>>> 
>>> KIP-751: Drop support for Scala 2.12 in Kafka 4.0
>>> (deprecate in
> 3.0)
>>> KIP-750: Drop support for Java 8 in Kafka 4.0 (deprecate in
 3.0)
>>> KIP-746: Revise KRaft Metadata Records
>>> KIP-745: Connect API to restart connector and tasks
>>> KIP-744: Migrate 

Re: [VOTE] KIP-708: Rack awareness for Kafka Streams

2021-03-18 Thread Levani Kokhreidze
Hello all,

Thanks for the valuable feedback and interesting discussion.
It seems like there’re no additional comments around the KIP, so I’d like to 
conclude the VOTING thread.

The VOTING thread was open since the 8th of March, and the results are as 
follows:
One non-binding vote (Bruno)
Three binding votes (John, Guozhang, Sophie)

As a result, KIP passes.

Thanks all for voting, and please let me know in case of any concerns.

Best,
Levani


> On 17. Mar 2021, at 21:21, Guozhang Wang  wrote:
> 
> SGTM for going back to encoding the full names too --- flexibility wins
> here, and if users do hit limits on bytes, probably they'd consider giving
> some shorter names anyways :)
> 
> Guozhang
> 
> On Wed, Mar 17, 2021 at 11:25 AM Levani Kokhreidze 
> wrote:
> 
>> Hi Sophie,
>> 
>> No worries! And thanks for taking a look.
>> I’ve updated the KIP.
>> 
>> Will wait some time for any additional feedback that might arise.
>> 
>> Best,
>> Levani
>> 
>>> On 17. Mar 2021, at 19:11, Sophie Blee-Goldman
>>  wrote:
>>> 
>>> Ah, sorry for hijacking the VOTE thread :(
>>> 
>>> Limiting the tag length and total amount of tags specified are already
>> part
>>>> of the implementation I work on. Assuming that
>>> 
>>> encoding a limited number of strings is acceptable, I think it's the most
>>>> straightforward way to move forward. Any objections?
>>> 
>>> 
>>> This sounds good to me -- I imagine most users probably only need a
>> handful
>>> of tags anyway. If someone is bumping up
>>> against the limit and has a valid use case, we can always increase it.
>>> 
>>> One last minor thing -- if we're going to encode the full tag names,
>> then I
>>> think we can leave out the "version" field from the
>>> ClientTag struct. If we ever want to modify this struct, we should do so
>> by
>>> bumping the overall SubscriptionInfo protocol version.
>>> This way we have one fewer version in the mix, and we get all the
>> benefits
>>> of version probing already baked in -- which
>>> means we can modify the protocol however we like without worrying about
>>> compatibility. For example this gives us the flexibility
>>> to go back to some kind of encoding if need be (although I don't expect
>> to
>>> need to).
>>> 
>>> If everyone else is on board with the current KIP, I'm +1 (binding) --
>>> thanks for the proposal Levani!
>>> 
>>> Cheers,
>>> Sophie
>>> 
>>> 
>>> On Wed, Mar 17, 2021 at 7:54 AM Levani Kokhreidze <
>> levani.co...@gmail.com>
>>> wrote:
>>> 
>>>> Hi Sophie and Bruno,
>>>> 
>>>> Thanks for the questions and suggestions.
>>>> Not sure if it's best to discuss this in the DISCUSSION thread, but I
>> will
>>>> write it here first, and if it needs more discussion, we can move to the
>>>> DISCUSSION thread.
>>>> Actually, in the implementation, I have a version field in the ClientTag
>>>> struct. I assumed that all structs must-have versions, and it's an
>> explicit
>>>> requirement; therefore, I left it out of KIP (fixed).
>>>> I'm okay with changing the name to "rack.aware.assignment.tags" (fixed).
>>>> As for upgrade and evolving tags, good question, we must try to make it
>> as
>>>> flexible as possible. Good catch that with encoding changing the tags
>> may
>>>> be problematic, especially changing the tags' order. One other way
>> around
>>>> it maybe is to change the "rack.aware.assignment.tags" config in a way
>> that
>>>> users can specify the tag index. For instance:
>>>> rack.aware.assignment.tags.0: cluster, rack.aware.assignment.1: zone;
>> But
>>>> configuration is way uglier and more complicated (and easier to get
>> wrong).
>>>> Limiting the tag length and total amount of tags specified are already
>> part
>>>> of the implementation I work on. Assuming that encoding a limited
>> number of
>>>> strings is acceptable, I think it's the most straightforward way to move
>>>> forward. Any objections?
>>>> I've updated KIP [1] with the latest discussion points and reverted the
>>>> "encoding tag keys" part (sorry Guozhang, I haven't really thought about
>>>> this potential edge-case, and thanks, Sophie, for catching it).
>>>> 
>>>> I am looking forward to your feedback.
>>>> 
>>>> Best,
>>>> Levani
>>>> 
>>>> [1] -
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams
>>>> 
>>>>> On 16. Mar 2021, at 23:30, Bruno Cadonna 
>>>> wrote:
>>>>> 
>>>>> Sophie
>>>> 
>>>> 
>> 
>> 
> 
> -- 
> -- Guozhang



Re: [VOTE] KIP-708: Rack awareness for Kafka Streams

2021-03-17 Thread Levani Kokhreidze
Hi Sophie,

No worries! And thanks for taking a look.
I’ve updated the KIP.

Will wait some time for any additional feedback that might arise.

Best,
Levani

> On 17. Mar 2021, at 19:11, Sophie Blee-Goldman  
> wrote:
> 
> Ah, sorry for hijacking the VOTE thread :(
> 
> Limiting the tag length and total amount of tags specified are already part
>> of the implementation I work on. Assuming that
> 
> encoding a limited number of strings is acceptable, I think it's the most
>> straightforward way to move forward. Any objections?
> 
> 
> This sounds good to me -- I imagine most users probably only need a handful
> of tags anyway. If someone is bumping up
> against the limit and has a valid use case, we can always increase it.
> 
> One last minor thing -- if we're going to encode the full tag names, then I
> think we can leave out the "version" field from the
> ClientTag struct. If we ever want to modify this struct, we should do so by
> bumping the overall SubscriptionInfo protocol version.
> This way we have one fewer version in the mix, and we get all the benefits
> of version probing already baked in -- which
> means we can modify the protocol however we like without worrying about
> compatibility. For example this gives us the flexibility
> to go back to some kind of encoding if need be (although I don't expect to
> need to).
> 
> If everyone else is on board with the current KIP, I'm +1 (binding) --
> thanks for the proposal Levani!
> 
> Cheers,
> Sophie
> 
> 
> On Wed, Mar 17, 2021 at 7:54 AM Levani Kokhreidze 
> wrote:
> 
>> Hi Sophie and Bruno,
>> 
>> Thanks for the questions and suggestions.
>> Not sure if it's best to discuss this in the DISCUSSION thread, but I will
>> write it here first, and if it needs more discussion, we can move to the
>> DISCUSSION thread.
>> Actually, in the implementation, I have a version field in the ClientTag
>> struct. I assumed that all structs must-have versions, and it's an explicit
>> requirement; therefore, I left it out of KIP (fixed).
>> I'm okay with changing the name to "rack.aware.assignment.tags" (fixed).
>> As for upgrade and evolving tags, good question, we must try to make it as
>> flexible as possible. Good catch that with encoding changing the tags may
>> be problematic, especially changing the tags' order. One other way around
>> it maybe is to change the "rack.aware.assignment.tags" config in a way that
>> users can specify the tag index. For instance:
>> rack.aware.assignment.tags.0: cluster, rack.aware.assignment.1: zone; But
>> configuration is way uglier and more complicated (and easier to get wrong).
>> Limiting the tag length and total amount of tags specified are already part
>> of the implementation I work on. Assuming that encoding a limited number of
>> strings is acceptable, I think it's the most straightforward way to move
>> forward. Any objections?
>> I've updated KIP [1] with the latest discussion points and reverted the
>> "encoding tag keys" part (sorry Guozhang, I haven't really thought about
>> this potential edge-case, and thanks, Sophie, for catching it).
>> 
>> I am looking forward to your feedback.
>> 
>> Best,
>> Levani
>> 
>> [1] -
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams
>> 
>>> On 16. Mar 2021, at 23:30, Bruno Cadonna 
>> wrote:
>>> 
>>> Sophie
>> 
>> 



Re: [VOTE] KIP-708: Rack awareness for Kafka Streams

2021-03-17 Thread Levani Kokhreidze
Hi Sophie and Bruno,

Thanks for the questions and suggestions.
Not sure if it's best to discuss this in the DISCUSSION thread, but I will 
write it here first, and if it needs more discussion, we can move to the 
DISCUSSION thread.
Actually, in the implementation, I have a version field in the ClientTag 
struct. I assumed that all structs must-have versions, and it's an explicit 
requirement; therefore, I left it out of KIP (fixed).
I'm okay with changing the name to "rack.aware.assignment.tags" (fixed).
As for upgrade and evolving tags, good question, we must try to make it as 
flexible as possible. Good catch that with encoding changing the tags may be 
problematic, especially changing the tags' order. One other way around it maybe 
is to change the "rack.aware.assignment.tags" config in a way that users can 
specify the tag index. For instance: rack.aware.assignment.tags.0: cluster, 
rack.aware.assignment.1: zone; But configuration is way uglier and more 
complicated (and easier to get wrong). Limiting the tag length and total amount 
of tags specified are already part of the implementation I work on. Assuming 
that encoding a limited number of strings is acceptable, I think it's the most 
straightforward way to move forward. Any objections?
I've updated KIP [1] with the latest discussion points and reverted the 
"encoding tag keys" part (sorry Guozhang, I haven't really thought about this 
potential edge-case, and thanks, Sophie, for catching it).

I am looking forward to your feedback.

Best,
Levani

[1] - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams

> On 16. Mar 2021, at 23:30, Bruno Cadonna  wrote:
> 
> Sophie



Re: [VOTE] KIP-708: Rack awareness for Kafka Streams

2021-03-15 Thread Levani Kokhreidze
Hello all,

Bumping this thread as we are one binding vote short accepting this KIP.
Please let me know if you have any extra concerns and/or suggestions.

Regards,
Levani

> On 12. Mar 2021, at 13:14, Levani Kokhreidze  wrote:
> 
> Hi Guozhang,
> 
> Thanks for the feedback. I think it makes sense.
> I updated the KIP with your proposal [1], it’s a nice optimisation.
> I do agree that having the same configuration across Kafka Streams instances 
> is the reasonable requirement.
> 
> Best,
> Levani
> 
> [1] - 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams
>  
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams>
> 
> 
>> On 12. Mar 2021, at 03:36, Guozhang Wang > <mailto:wangg...@gmail.com>> wrote:
>> 
>> Hello Levani,
>> 
>> Thanks for the great write-up! I think this proposal makes sense, though I
>> have one minor suggestion regarding the protocol format change: note the
>> subscription info is part of the group metadata message that we need to
>> write into the internal topic, and hence it's always better if we can save
>> on the number of bytes written there. For this, I'm wondering if we can
>> encode the key part instead of writing raw bytes based on the
>> configurations, i.e.:
>> 
>> 1. streams will look at the `task.assignment.rack.awareness` values, and
>> encode them in a deterministic manner, e.g. in your example zone = 0,
>> cluster = 1. This assumes that all instances will configure this value in
>> the same way and then with a deterministic manner all instances will have
>> the same encodings, which I think is a reasonable requirement.
>> 2. the sent protocol would be "key => short, value => bytes" instead.
>> 
>> 
>> WDYT?
>> 
>> Otherwise, I'm +1 on the KIP!
>> 
>> Guozhang
>> 
>> 
>> 
>> 
>> On Thu, Mar 11, 2021 at 8:29 AM John Roesler > <mailto:vvcep...@apache.org>> wrote:
>> 
>>> Thanks for the KIP!
>>> 
>>> I'm +1 (binding)
>>> 
>>> -John
>>> 
>>> On Wed, 2021-03-10 at 13:13 +0200, Levani Kokhreidze wrote:
>>>> Hello all,
>>>> 
>>>> I’d like to start the voting on KIP-708 [1]
>>>> 
>>>> Best,
>>>> Levani
>>>> 
>>>> [1] -
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams
>>>  
>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams>
>>>> 
>>> 
>>> 
>>> 
>> 
>> -- 
>> -- Guozhang
> 



Re: [VOTE] KIP-708: Rack awareness for Kafka Streams

2021-03-12 Thread Levani Kokhreidze
Hi Guozhang,

Thanks for the feedback. I think it makes sense.
I updated the KIP with your proposal [1], it’s a nice optimisation.
I do agree that having the same configuration across Kafka Streams instances is 
the reasonable requirement.

Best,
Levani

[1] - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams


> On 12. Mar 2021, at 03:36, Guozhang Wang  wrote:
> 
> Hello Levani,
> 
> Thanks for the great write-up! I think this proposal makes sense, though I
> have one minor suggestion regarding the protocol format change: note the
> subscription info is part of the group metadata message that we need to
> write into the internal topic, and hence it's always better if we can save
> on the number of bytes written there. For this, I'm wondering if we can
> encode the key part instead of writing raw bytes based on the
> configurations, i.e.:
> 
> 1. streams will look at the `task.assignment.rack.awareness` values, and
> encode them in a deterministic manner, e.g. in your example zone = 0,
> cluster = 1. This assumes that all instances will configure this value in
> the same way and then with a deterministic manner all instances will have
> the same encodings, which I think is a reasonable requirement.
> 2. the sent protocol would be "key => short, value => bytes" instead.
> 
> 
> WDYT?
> 
> Otherwise, I'm +1 on the KIP!
> 
> Guozhang
> 
> 
> 
> 
> On Thu, Mar 11, 2021 at 8:29 AM John Roesler  wrote:
> 
>> Thanks for the KIP!
>> 
>> I'm +1 (binding)
>> 
>> -John
>> 
>> On Wed, 2021-03-10 at 13:13 +0200, Levani Kokhreidze wrote:
>>> Hello all,
>>> 
>>> I’d like to start the voting on KIP-708 [1]
>>> 
>>> Best,
>>> Levani
>>> 
>>> [1] -
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams
>>> 
>> 
>> 
>> 
> 
> -- 
> -- Guozhang



[VOTE] KIP-708: Rack awareness for Kafka Streams

2021-03-10 Thread Levani Kokhreidze
Hello all,

I’d like to start the voting on KIP-708 [1]

Best,
Levani

[1] - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams



Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-03-09 Thread Levani Kokhreidze
Hi Bruno,

Thanks for the feedback, that makes sense.
I’ve updated the KIP based on suggestions [1]

Best,
Levani

[1] - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams

> On 9. Mar 2021, at 11:48, Bruno Cadonna  wrote:
> 
> Hi Levani,
> 
> The KIP looks good!
> 
> I have two comments:
> 
> 1. In the example of the ideal standby task distribution, you should make 
> clear that the algorithm will either choose distributions Node-1, Node-5, 
> Node-9 or Node-1, Node-6, Node-8, but not both.
> 
> 2. Could you formulate a bit more generic section "Changes in 
> HighAvailabilityTaskAssignor"? I think it is enough to state that this KIP 
> will NOT affect the task assignor behavior specified in KIP-441, but it will 
> rather extend the behavior of the distribution of standby replicas. I think 
> there is no need to refer to actual code.
> 
> After this changes, I am +1 on starting the vote thread.
> 
> Best,
> Bruno
> 
> On 08.03.21 17:32, Levani Kokhreidze wrote:
>> Hello all,
>> Bumping this thread in case there’s any other feedback around KIP-708 [1].
>> If not, I will start voting thread sometime this week.
>> Best,
>> Levani
>>  [1] - 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams
>>  
>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+awarness+for+Kafka+Streams>
>>> On 4. Mar 2021, at 10:36, Levani Kokhreidze  wrote:
>>> 
>>> Hi Bruno,
>>> 
>>> Thanks a lot for the feedback.
>>> I’ve updated KIP [1] based on suggestions.
>>> 
>>> Regards,
>>> Levani
>>> 
>>> [1] - 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams
>>>  
>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+awarness+for+Kafka+Streams>
>>> 
>>>> On 1. Mar 2021, at 22:55, Bruno Cadonna >>> <mailto:br...@confluent.io>> wrote:
>>>> 
>>>> clientTagPrefix
>>> 



Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-03-08 Thread Levani Kokhreidze
Hello all,

Bumping this thread in case there’s any other feedback around KIP-708 [1].
If not, I will start voting thread sometime this week.

Best,
Levani

 [1] - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+awarness+for+Kafka+Streams>

> On 4. Mar 2021, at 10:36, Levani Kokhreidze  wrote:
> 
> Hi Bruno,
> 
> Thanks a lot for the feedback.
> I’ve updated KIP [1] based on suggestions.
> 
> Regards,
> Levani
> 
> [1] - 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams
>  
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+awarness+for+Kafka+Streams>
> 
>> On 1. Mar 2021, at 22:55, Bruno Cadonna > <mailto:br...@confluent.io>> wrote:
>> 
>> clientTagPrefix
> 



Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-03-04 Thread Levani Kokhreidze
Hi Bruno,

Thanks a lot for the feedback.
I’ve updated KIP [1] based on suggestions.

Regards,
Levani

[1] - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams

> On 1. Mar 2021, at 22:55, Bruno Cadonna  wrote:
> 
> clientTagPrefix



Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-02-28 Thread Levani Kokhreidze
Hello Ryanne,

Thanks for the question.
Tag approach gives more flexibility, which otherwise could have been only 
possible with pluggable custom logic Kafka Streams's user must provide (it is 
briefly described in "Rejected Alternatives" section).
For instance, if we append multiple tags to form a single rack, it may not give 
desired distribution to the user if the infrastructure topology is more complex.
Let us consider the following example with appending multiple tags to form the 
single rack.
Node-1:
rack.id: K8s_Cluster1-eu-central-1a
num.standby.replicas: 1

Node-2:
rack.id: K8s_Cluster1-eu-central-1b
num.standby.replicas: 1

Node-3:
rack.id: K8s_Cluster1-eu-central-1c
num.standby.replicas: 1

Node-4:
rack.id: K8s_Cluster2-eu-central-1a
num.standby.replicas: 1

Node-5:
rack.id: K8s_Cluster2-eu-central-1b
num.standby.replicas: 1

Node-6:
rack.id: K8s_Cluster2-eu-central-1c
num.standby.replicas: 1

In the example mentioned above, we have three AZs and two Kubernetes clusters. 
Our use-case is to distribute standby task in the different Kubernetes cluster 
and different availability zone.
For instance, if the active task is in Node1 (K8s_Cluster1-eu-central-1a), the 
corresponding standby task should be in either 
Node-5(K8s_Cluster2-eu-central-1b) or Node-6(K8s_Cluster2-eu-central-1c).
Unfortunately, without custom logic provided by the user, this would be very 
hard to achieve with a single "rack.id" configuration. Because without any 
input from the user, Kafka Streams might as well allocate standby task for the 
active task either:
In the same Kubernetes cluster and different AZ (Node-2, Node-3)
In the different Kubernetes cluster but the same AZ (Node-4)
On the other hand, with the combination of the new "client.tag.*" and 
"task.assignment.rack.awareness" configurations, standby task distribution 
algorithm will be able to figure out what will be the most optimal distribution 
by balancing the standby tasks over each client.tag dimension individually. And 
it can be achieved by simply providing necessary configurations to Kafka 
Streams.
The flow was described in more details in previous versions of the KIP, but 
I've omitted the KIP algorithm implementation details based on received 
feedback. But I acknowledge that this information can be put in the KIP for 
better clarity. I took the liberty of updating the KIP with the example 
mentioned above [1]. 
I hope this answeres your question.

Regards,
Levani

[1] - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-Benefitsoftagsvssinglerack.idconfiguration

> On 28. Feb 2021, at 01:37, Ryanne Dolan  wrote:
> 
> I guess I don't understand how multiple tags work together to achieve rack
> awareness. I realize I could go look at how Elasticseach works, but ideally
> this would be more plain in the KIP.
> 
> In particular I'm not sure how the tag approach is different than appending
> multiple tags together, e.g. how is cluster=foo, zone=bar different than
> rack=foo-bar?
> 
> Ryanne
> 
> On Sat, Feb 27, 2021, 5:00 PM Levani Kokhreidze  <mailto:levani.co...@gmail.com>>
> wrote:
> 
>> Hi Bruno,
>> 
>> Thanks for the feedback. I think it makes sense.
>> I’ve updated the KIP [1] and tried to omit implementation details around
>> the algorithm.
>> 
>> Please let me know if the latest version looks OK.
>> 
>> Regards,
>> Levani
>> 
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams
>> 
>>> On 25. Feb 2021, at 17:59, Bruno Cadonna  wrote:
>>> 
>>> Hi Levani,
>>> 
>>> I discussed your KIP with John the other day and we both think it is a
>> really interesting KIP and you did a good job in writing it. However, we
>> think that the KIP exposes to many implementation details. That makes
>> future changes to the implementation of the distribution algorithm harder
>> without a KIP. So, we would like to propose to just describe the config and
>> the properties that any implementation of the distribution algorithm should
>> have. We did something similar in KIP-441 for the task assignment algorithm
>> [1].
>>> 
>>> Specifically, for your KIP, any possible implementation of the
>> distribution algorithm should read the tags to be considered for rack
>> awareness from the config and if the cluster allows to distribute each
>> active task and its replicas to Streams clients with different values for
>> each tag, the algorithm will do so. How the implementation behaves, if a
>> cluster does not allow to distribute over all tag values can be left as an
>> implementation detail. This would give us flexibility for futur

Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-02-27 Thread Levani Kokhreidze
Hi Bruno,

Thanks for the feedback. I think it makes sense.
I’ve updated the KIP [1] and tried to omit implementation details around the 
algorithm.

Please let me know if the latest version looks OK.

Regards,
Levani


[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams

> On 25. Feb 2021, at 17:59, Bruno Cadonna  wrote:
> 
> Hi Levani,
> 
> I discussed your KIP with John the other day and we both think it is a really 
> interesting KIP and you did a good job in writing it. However, we think that 
> the KIP exposes to many implementation details. That makes future changes to 
> the implementation of the distribution algorithm harder without a KIP. So, we 
> would like to propose to just describe the config and the properties that any 
> implementation of the distribution algorithm should have. We did something 
> similar in KIP-441 for the task assignment algorithm [1].
> 
> Specifically, for your KIP, any possible implementation of the distribution 
> algorithm should read the tags to be considered for rack awareness from the 
> config and if the cluster allows to distribute each active task and its 
> replicas to Streams clients with different values for each tag, the algorithm 
> will do so. How the implementation behaves, if a cluster does not allow to 
> distribute over all tag values can be left as an implementation detail. This 
> would give us flexibility for future changes to the distribution algorithm.
> 
> Since there may be distribution algorithms that do not use the order of the 
> tags, it would be better to not mention the order of the tags in the config 
> doc. I would propose to omit the config doc from the KIP or formulate it 
> really generic.
> 
> We would also like to rename standby.replicas.awareness to 
> task.assignment.rack.awareness or something that does not contain standby 
> and/or replica (sorry for requesting again to change this name). That way, we 
> might be able to use this config also when we decide to make the active task 
> assignment rack aware.
> 
> I hope all of this makes sense to you.
> 
> Thank you again for the interesting KIP!
> 
> Looking forward to your implementation!
> 
> Best,
> Bruno
> 
> 
> [1] 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
>  
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm>
> 
> On 22.02.21 17:42, Levani Kokhreidze wrote:
>> Hi Bruno,
>> Thanks for the quick reply
>> 5.Sorry, maybe I am not making it clear.
>> What you have described is how it should work, yes. As it is stated in KIP, 
>> with the importance semantics in standby.replicas.awareness,
>> if we have an active task on Node-1 and the first standby task on Node-5, 
>> the third standby should be on Node-3 or Node-6 (both of them are in the 
>> different AZ compared to the active and the first standby task).
>> That flow is described in Partially Preferred Distribution section [1].
>> Node-4 could have been a valid option IF standby.replicas.awareness didn’t 
>> have the importance semantics because Kafka Streams could have just picked 
>> Node-4 in that case.
>> 7. Yup, will do.
>> 8. Good question, So far assumption I had was that the configuration between 
>> different Kafka Streams instances is the same. Can we have an extra 
>> validation check to make sure that is the case?
>> If not, what you have mentioned in point 5 always preferring the dimension 
>> where there’re enough KS instances is very valid.
>> On the other hand, I thought allowing to specify the importance of the 
>> various dimensions may give users extra flexibility over standby task 
>> allocation.
>> [1] 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-PartiallyPreferredStandbyTaskDistribution
>> Regards,
>> Levani
>>> On 22. Feb 2021, at 16:51, Bruno Cadonna  wrote:
>>> 
>>> Hi Levani,
>>> 
>>> Thanks for the modifications!
>>> 
>>> I have some follow up questions/comments:
>>> 
>>> 5. Something is not clear to me. If the active is on Node-1 and the first 
>>> replica is on Node-5 (different cluster, different zone), why would the 
>>> second replica go to Node-4 that has a different cluster than but the same 
>>> zone as the active instead of Node-6 which has a different zone of Node-1? 
>>> In general wouldn't it be better to guarantee under

Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-02-22 Thread Levani Kokhreidze
Hi Bruno,

Thanks for the quick reply

5.Sorry, maybe I am not making it clear. 
What you have described is how it should work, yes. As it is stated in KIP, 
with the importance semantics in standby.replicas.awareness, 
if we have an active task on Node-1 and the first standby task on Node-5, the 
third standby should be on Node-3 or Node-6 (both of them are in the different 
AZ compared to the active and the first standby task). 
That flow is described in Partially Preferred Distribution section [1]. 
Node-4 could have been a valid option IF standby.replicas.awareness didn’t have 
the importance semantics because Kafka Streams could have just picked Node-4 in 
that case.

7. Yup, will do.

8. Good question, So far assumption I had was that the configuration between 
different Kafka Streams instances is the same. Can we have an extra validation 
check to make sure that is the case?
If not, what you have mentioned in point 5 always preferring the dimension 
where there’re enough KS instances is very valid.
On the other hand, I thought allowing to specify the importance of the various 
dimensions may give users extra flexibility over standby task allocation.

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-PartiallyPreferredStandbyTaskDistribution

Regards,
Levani


> On 22. Feb 2021, at 16:51, Bruno Cadonna  wrote:
> 
> Hi Levani,
> 
> Thanks for the modifications!
> 
> I have some follow up questions/comments:
> 
> 5. Something is not clear to me. If the active is on Node-1 and the first 
> replica is on Node-5 (different cluster, different zone), why would the 
> second replica go to Node-4 that has a different cluster than but the same 
> zone as the active instead of Node-6 which has a different zone of Node-1? In 
> general wouldn't it be better to guarantee under Partially Preferred task 
> distribution to distribute active and standby replicas of the same task over 
> the dimension that has at least as many values as the number of replicas + 1 
> and then over the dimensions that have less values? That would then also be 
> independent on the ordering of the tags.
> 
> 7. I agree with you. Could you add a sentence or two about this to the KIP?
> 
> New question:
> 
> 8. How would the assignor react on different numbers and different orderings 
> of the tags in standby.replicas.awareness across Streams clients?
> 
> Best,
> Bruno
> 
> 
> On 22.02.21 11:46, Levani Kokhreidze wrote:
>> Hi Bruno,
>> Thanks for the feedback. Please check my answers below:
>> 1. No objections; sounds good. Updated KIP
>> 2. No objections; sounds good. Updated KIP
>> 3. Thanks for the information; I can change KIP only to expose prefix method 
>> instead of a constant if it’s the way forward.
>> 4. Done. Updated KIP
>> 5. Yes, order in standby.replicas.awareness config counts as stated in the 
>> STANDBY_REPLICA_AWARENESS_DOC.
>> Actually, it plays a role in Partially Preferred distribution. In the 
>> example presented in the KIP, while one of the standby tasks can be placed 
>> in a different cluster and different zone compared to the active task, we 
>> have to choose either the same cluster or the same zone for the second 
>> standby task. In the first example presented in the KIP, while Node-5 is in 
>> the other cluster and other zone compared to the active task, the second 
>> standby task's preferred options are in different zones than Node-1 and 
>> Node-5, but in the same cluster as active task or the first standby task. 
>> Without importance semantics in standby.replicas.awareness, putting second 
>> standby task in Node-4 (different cluster, same zone as active task) would 
>> have been a valid option.
>> I’ve updated KIP to clarify this a bit more, I hope this helps.
>> 6. Thanks for pointing that out, it was a mistake. I’ve removed that phrase 
>> from the KIP.
>> 7. It shouldn’t affect HighAvailabilityTaskAssignor in a “breaking way” 
>> meaning that all the existing behavior should stay as is (e.g., when new 
>> configurations are not specified). Once required configurations are set, the 
>> main change should happen in 
>> HighAvailabilityTaskAssignor#assignStandbyReplicaTasks and 
>> HighAvailabilityTaskAssignor#assignStandbyTaskMovements
>> I hope this answers your questions.
>> Regards,
>> Levani
>>> On 18. Feb 2021, at 15:10, Bruno Cadonna  wrote:
>>> 
>>> Hi Levani,
>>> 
>>> Thank you for the KIP.
>>> 
>>> Really interesting!
>>> 
>>> Here my comments:
>>> 
>>> 1. To be consistent with the other configs that involve standbys , I would 
&

Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-02-22 Thread Levani Kokhreidze
Hi Bruno,

Thanks for the feedback. Please check my answers below:

1. No objections; sounds good. Updated KIP

2. No objections; sounds good. Updated KIP

3. Thanks for the information; I can change KIP only to expose prefix method 
instead of a constant if it’s the way forward.

4. Done. Updated KIP

5. Yes, order in standby.replicas.awareness config counts as stated in the 
STANDBY_REPLICA_AWARENESS_DOC. 
Actually, it plays a role in Partially Preferred distribution. In the example 
presented in the KIP, while one of the standby tasks can be placed in a 
different cluster and different zone compared to the active task, we have to 
choose either the same cluster or the same zone for the second standby task. In 
the first example presented in the KIP, while Node-5 is in the other cluster 
and other zone compared to the active task, the second standby task's preferred 
options are in different zones than Node-1 and Node-5, but in the same cluster 
as active task or the first standby task. Without importance semantics in 
standby.replicas.awareness, putting second standby task in Node-4 (different 
cluster, same zone as active task) would have been a valid option.
I’ve updated KIP to clarify this a bit more, I hope this helps. 

6. Thanks for pointing that out, it was a mistake. I’ve removed that phrase 
from the KIP. 

7. It shouldn’t affect HighAvailabilityTaskAssignor in a “breaking way” meaning 
that all the existing behavior should stay as is (e.g., when new configurations 
are not specified). Once required configurations are set, the main change 
should happen in HighAvailabilityTaskAssignor#assignStandbyReplicaTasks and 
HighAvailabilityTaskAssignor#assignStandbyTaskMovements

I hope this answers your questions.

Regards,
Levani

> On 18. Feb 2021, at 15:10, Bruno Cadonna  wrote:
> 
> Hi Levani,
> 
> Thank you for the KIP.
> 
> Really interesting!
> 
> Here my comments:
> 
> 1. To be consistent with the other configs that involve standbys , I would 
> rename
> standby.task.assignment.awareness -> standby.replicas.awareness
> 
> 2. I would also rename the prefix
> instance.tag -> client.tag
> 
> 3. The following is a question about prefixes in general that maybe somebody 
> else can answer. In the config it says for other prefixes that it is 
> recommended to use the method *Prefix(final String prop) instead of the raw 
> prefix string.
> 
> Is the plan to make the raw prefix string private in a future release?
> Should we consider making only the prefix method for this KIP public?
> 
> 4. Could you provide a mathematical formula instead of Java code for absolute 
> preferred standby task distribution and the other distributtion properties? 
> Could you also add an example for absolute preffered distribution for the 
> computation of the formula similar to what you did for the other properties?
> 
> 5. Does the order of the tags given for standby.task.assignment.awareness 
> count? You mention it once, but then for the Partially Preferred standby task 
> distribution property it does not seem to be important.
> 
> 6. In the section about least preferred standby task distribution, you state 
> that "and one [zone] will be reserved for active task". What do you mean by 
> that? All Streams clients will participate in the task assignment of active 
> tasks irrespective of their tags, right? The statement does also not really 
> fit with the example where active stateful task 0_0 is on Node-1, does it?
> 
> 7. Could you also say some words about how this KIP affects the current 
> HighAvailabilityTaskAssignor?
> 
> 
> Best,
> Bruno
> 
> On 09.02.21 15:54, Levani Kokhreidze wrote:
>> Hello all,
>> I’ve updated KIP-708 [1] to reflect the latest discussion outcomes.
>> I’m looking forward to your feedback.
>> Regards,
>> Levani
>> [1] - 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams
>>> On 2. Feb 2021, at 22:03, Levani Kokhreidze  wrote:
>>> 
>>> Hi John.
>>> 
>>> Thanks a lot for this detailed analysis!
>>> Yes, that is what I had in mind as well.
>>> I also like that idea of having “task.assignment.awareness” configuration
>>> to tell which instance tags can be used for rack awareness.
>>> I may borrow it for this KIP if you don’t mind :)
>>> 
>>> Thanks again John for this discussion, it’s really valuable.
>>> 
>>> I’ll update the proposal and share it once again in this discussion thread.
>>> 
>>> Regards,
>>> Levani
>>> 
>>>> On 2. Feb 2021, at 18:47, John Roesler >>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>> <mailto:vvcep...@apache.org>

Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-02-09 Thread Levani Kokhreidze
Hello all,

I’ve updated KIP-708 [1] to reflect the latest discussion outcomes. 
I’m looking forward to your feedback.

Regards,
Levani

[1] - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams

> On 2. Feb 2021, at 22:03, Levani Kokhreidze  wrote:
> 
> Hi John.
> 
> Thanks a lot for this detailed analysis! 
> Yes, that is what I had in mind as well. 
> I also like that idea of having “task.assignment.awareness” configuration
> to tell which instance tags can be used for rack awareness.
> I may borrow it for this KIP if you don’t mind :) 
> 
> Thanks again John for this discussion, it’s really valuable.
> 
> I’ll update the proposal and share it once again in this discussion thread.
> 
> Regards,
> Levani 
> 
>> On 2. Feb 2021, at 18:47, John Roesler > <mailto:vvcep...@apache.org>> wrote:
>> 
>> Hi Levani,
>> 
>> 1. Thanks for the details.
>> 
>> I figured it must be something like this two-dimensional definition of 
>> "rack".
>> 
>> It does seem like, if we make the config take a list of tags, we can define
>> the semantics to be that the system will make a best effort to distribute
>> the standbys over each rack dimension.
>> 
>> In your example, there are two clusters and three AZs. The example
>> configs would be:
>> 
>> Node 1:
>> instance.tag.cluster: K8s_Cluster1
>> instance.tag.zone: eu-central-1a
>> task.assignment.awareness: cluster,zone
>> 
>> Node 2:
>> instance.tag.cluster: K8s_Cluster1
>> instance.tag.zone: eu-central-1b
>> task.assignment.awareness: cluster,zone
>> 
>> Node 3:
>> instance.tag.cluster: K8s_Cluster1
>> instance.tag.zone: eu-central-1c
>> task.assignment.awareness: cluster,zone
>> 
>> Node 4:
>> instance.tag.cluster: K8s_Cluster2
>> instance.tag.zone: eu-central-1a
>> task.assignment.awareness: cluster,zone
>> 
>> Node 5:
>> instance.tag.cluster: K8s_Cluster2
>> instance.tag.zone: eu-central-1b
>> task.assignment.awareness: cluster,zone
>> 
>> Node 6:
>> instance.tag.cluster: K8s_Cluster2
>> instance.tag.zone: eu-central-1c
>> task.assignment.awareness: cluster,zone
>> 
>> 
>> Now, if we have a task 0_0 with an active and two replicas,
>> there are three total copies of the task to distribute over:
>> * 6 instances
>> * 2 clusters
>> * 3 zones
>> 
>> There is a constraint that we _cannot_ assign two copies of a task
>> to a single instance, but it seems like the default rack awareness
>> would permit us to assign two copies of a task to a rack, if (and only
>> if) the number of copies is greater than the number of racks.
>> 
>> So, the assignment we would get is like this:
>> * assigned to three different instances
>> * one copy in each of zone a, b, and c
>> * two copies in one cluster and one in the other cluster
>> 
>> For example, we might have 0_0 assigned to:
>> * Node 1 (cluster 1, zone a)
>> * Node 5 (cluster 2, zone b)
>> * Node 3 (cluster 1, zone c)
>> 
>> Is that what you were also thinking?
>> 
>> Thanks,
>> -John
>> 
>> On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote:
>>> Hi John,
>>> 
>>> 1. Main reason was that it seemed easier change compared to having 
>>> multiple tags assigned to each host.
>>> 
>>> ---
>>> 
>>> Answering your question what use-case I have in mind:
>>> Lets say we have two Kubernetes clusters running the same Kafka Streams 
>>> application. 
>>> And each Kubernetes cluster is spanned across multiple AZ. 
>>> So the setup overall looks something like this:
>>> 
>>> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c]
>>> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c]
>>> 
>>> Now, if Kafka Streams application is launched in K8s_Clister1: 
>>> eu-central-1a,
>>> ideally I would want standby task to be created in the different K8s 
>>> cluster and region.
>>> So in this example it can be K8s_Cluster2: [eu-central-1b, 
>>> eu-central-1c]
>>> 
>>> But giving it a bit more thought, this can be implemented if we change 
>>> semantics of “tags” a bit.
>>> So instead of doing full match with tags, we can do iterative matching 
>>> and it should work.
>>> (If this is what you had in mind, apologies for the misunderstanding).
>>> 
>>> If we consider the same example as mentioned above, for the active task 
>&g

Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-02-02 Thread Levani Kokhreidze
Hi John.

Thanks a lot for this detailed analysis! 
Yes, that is what I had in mind as well. 
I also like that idea of having “task.assignment.awareness” configuration
to tell which instance tags can be used for rack awareness.
I may borrow it for this KIP if you don’t mind :) 

Thanks again John for this discussion, it’s really valuable.

I’ll update the proposal and share it once again in this discussion thread.

Regards,
Levani 

> On 2. Feb 2021, at 18:47, John Roesler  wrote:
> 
> Hi Levani,
> 
> 1. Thanks for the details.
> 
> I figured it must be something like this two-dimensional definition of "rack".
> 
> It does seem like, if we make the config take a list of tags, we can define
> the semantics to be that the system will make a best effort to distribute
> the standbys over each rack dimension.
> 
> In your example, there are two clusters and three AZs. The example
> configs would be:
> 
> Node 1:
> instance.tag.cluster: K8s_Cluster1
> instance.tag.zone: eu-central-1a
> task.assignment.awareness: cluster,zone
> 
> Node 2:
> instance.tag.cluster: K8s_Cluster1
> instance.tag.zone: eu-central-1b
> task.assignment.awareness: cluster,zone
> 
> Node 3:
> instance.tag.cluster: K8s_Cluster1
> instance.tag.zone: eu-central-1c
> task.assignment.awareness: cluster,zone
> 
> Node 4:
> instance.tag.cluster: K8s_Cluster2
> instance.tag.zone: eu-central-1a
> task.assignment.awareness: cluster,zone
> 
> Node 5:
> instance.tag.cluster: K8s_Cluster2
> instance.tag.zone: eu-central-1b
> task.assignment.awareness: cluster,zone
> 
> Node 6:
> instance.tag.cluster: K8s_Cluster2
> instance.tag.zone: eu-central-1c
> task.assignment.awareness: cluster,zone
> 
> 
> Now, if we have a task 0_0 with an active and two replicas,
> there are three total copies of the task to distribute over:
> * 6 instances
> * 2 clusters
> * 3 zones
> 
> There is a constraint that we _cannot_ assign two copies of a task
> to a single instance, but it seems like the default rack awareness
> would permit us to assign two copies of a task to a rack, if (and only
> if) the number of copies is greater than the number of racks.
> 
> So, the assignment we would get is like this:
> * assigned to three different instances
> * one copy in each of zone a, b, and c
> * two copies in one cluster and one in the other cluster
> 
> For example, we might have 0_0 assigned to:
> * Node 1 (cluster 1, zone a)
> * Node 5 (cluster 2, zone b)
> * Node 3 (cluster 1, zone c)
> 
> Is that what you were also thinking?
> 
> Thanks,
> -John
> 
> On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote:
>> Hi John,
>> 
>> 1. Main reason was that it seemed easier change compared to having 
>> multiple tags assigned to each host.
>> 
>> ---
>> 
>> Answering your question what use-case I have in mind:
>> Lets say we have two Kubernetes clusters running the same Kafka Streams 
>> application. 
>> And each Kubernetes cluster is spanned across multiple AZ. 
>> So the setup overall looks something like this:
>> 
>> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c]
>> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c]
>> 
>> Now, if Kafka Streams application is launched in K8s_Clister1: 
>> eu-central-1a,
>> ideally I would want standby task to be created in the different K8s 
>> cluster and region.
>> So in this example it can be K8s_Cluster2: [eu-central-1b, 
>> eu-central-1c]
>> 
>> But giving it a bit more thought, this can be implemented if we change 
>> semantics of “tags” a bit.
>> So instead of doing full match with tags, we can do iterative matching 
>> and it should work.
>> (If this is what you had in mind, apologies for the misunderstanding).
>> 
>> If we consider the same example as mentioned above, for the active task 
>> we would
>> have following tags: [K8s_Cluster1, eu-central-1a]. In order to 
>> distribute standby task
>> in the different K8s cluster, plus in the different AWS region, standby 
>> task assignment 
>> algorithm can compare each tag by index. So steps would be something 
>> like:
>> 
>> // this will result in selecting client in the different K8s cluster
>> 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != allClientTags[0])
>> // this will result in selecting the client in different AWS region
>> 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != 
>> clientsInDifferentCluster[1] )
>> 
>> WDYT?
>> 
>> If you agree with the use-case I’ve mentioned, the pluggable assignor 
>> can be differed to another 

Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-02-02 Thread Levani Kokhreidze
Hi John,

1. Main reason was that it seemed easier change compared to having multiple 
tags assigned to each host.

---

Answering your question what use-case I have in mind:
Lets say we have two Kubernetes clusters running the same Kafka Streams 
application. 
And each Kubernetes cluster is spanned across multiple AZ. 
So the setup overall looks something like this:

K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c]
K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c]

Now, if Kafka Streams application is launched in K8s_Clister1: eu-central-1a,
ideally I would want standby task to be created in the different K8s cluster 
and region.
So in this example it can be K8s_Cluster2: [eu-central-1b, eu-central-1c]

But giving it a bit more thought, this can be implemented if we change 
semantics of “tags” a bit.
So instead of doing full match with tags, we can do iterative matching and it 
should work.
(If this is what you had in mind, apologies for the misunderstanding).

If we consider the same example as mentioned above, for the active task we would
have following tags: [K8s_Cluster1, eu-central-1a]. In order to distribute 
standby task
in the different K8s cluster, plus in the different AWS region, standby task 
assignment 
algorithm can compare each tag by index. So steps would be something like:

 // this will result in selecting client in the different K8s cluster
1. clientsInDifferentCluster = (tagsOfActiveTask[0] != allClientTags[0])
 // this will result in selecting the client in different AWS region
2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != 
clientsInDifferentCluster[1] )

WDYT?

If you agree with the use-case I’ve mentioned, the pluggable assignor can be 
differed to another KIP, yes.
As it won’t be required for this KIP and use-cases I had in mind to work.

Regards,
Levani 


> On 2. Feb 2021, at 07:55, John Roesler  wrote:
> 
> Hello Levani,
> 
> Thanks for the reply. 
> 
> 1. Interesting; why did you change your mind?
> 
> I have a gut feeling that we can achieve pretty much any rack awareness need 
> that people have by using purely config, which is obviously much easier to 
> use. But if you had a case in mind where this wouldn’t work, it would be good 
> to know. 
> 
> In fact, if that is true, then perhaps you could just defer the whole idea of 
> a pluggable interface (point 2) to a separate KIP. I do think a pluggable 
> assignor would be extremely valuable, but it might be nice to cut the scope 
> of KIP-708 if just a config will suffice.
> 
> What do you think?
> Thanks,
> John
> 
> 
> On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote:
>> Hi John,
>> 
>> Thanks a lot for thorough feedback, it’s really valuable.
>> 
>> 1. Agree with this. Had the same idea initially.
>> We can set some upper limit in terms of what’s 
>> the max number of tags users can set to make 
>> sure it’s not overused. By default, we can create 
>> standby tasks where tags are different from active task (full match). 
>> This should mimic default rack awareness behaviour.
>> 
>> 2. I like the idea and I’d be happy to work on 
>> refactoring TaskAssignor to accommodate rack awareness use-case. 
>> When I was going through the code, it felt way more natural 
>> to use pluggable TaskAssignor for achieving rack awareness 
>> instead of introducing new interface and contract. 
>> But I thought approach mentioned in the KIP is simpler so 
>> decided to move forward with it as an initial proposal :). 
>> But I agree with you, it will be much better if we can have 
>> TaskAssignor as pluggable interface users can use.
>> One potential challenge I see with this is that, if we just let
>> users implement TaskAssignor in its current form, we will be forcing
>> users to implement functionality for active task assignment, as well as
>> standby task assignment. This feels like not very clear contract, 
>> because with
>> just TaskAssignor interface it’s not really clear they one needs to 
>> allocate 
>> standby tasks as well. We can enforce it on some level with the return 
>> object
>> You’ve mentioned TaskAssignor#assign has to return, but still feels 
>> error prone.
>> In addition, I suspect in most of the cases users would want
>> to control standby task assignment and leave active task assignment as 
>> is. 
>> To make implementation of standby task assignment easier for users, 
>> what if
>> we decouple active and standby task assignment from the `TaskAssignor`?
>> Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor 
>> and StandbyTaskAssignor
>> and let users add their own implementation for them separately if they 
>> like via config.
>> 
>> If

Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-02-01 Thread Levani Kokhreidze
in the future.
> 
> Thanks again for this proposal, I hope the above is more
> inspiring than annoying :)
> 
> I really think your KIP is super high value in whatever form
> you ultimately land on.
> 
> 
> Thanks,
> John
> 
> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote:
>> Hi John
>> 
>> Thanks for the feedback (and for the great work on KIP441 :) ). 
>> Makes sense, will add a section in the KIP explaining rack awarenesses on 
>> high level and how it’s implemented in the different distributed systems.
>> 
>> Thanks,
>> Levani
>> 
>>> On 27. Jan 2021, at 16:07, John Roesler  wrote:
>>> 
>>> Hi Levani,
>>> 
>>> Thanks for this KIP! I think this is really high value; it was something I 
>>> was disappointed I didn’t get to do as part of KIP-441.
>>> 
>>> Rack awareness is a feature provided by other distributed systems as well. 
>>> I wonder if your KIP could devote a section to summarizing what rack 
>>> awareness looks like in other distributed systems, to help us put this 
>>> design in context. 
>>> 
>>> Thanks!
>>> John
>>> 
>>> 
>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote:
>>>> Hello all,
>>>> 
>>>> I’d like to start discussion on KIP-708 [1] that aims to introduce rack 
>>>> aware standby task distribution in Kafka Streams.
>>>> In addition to changes mentioned in the KIP, I’d like to get some ideas 
>>>> on additional change I have in mind. 
>>>> Assuming KIP moves forward, I was wondering if it makes sense to 
>>>> configure Kafka Streams consumer instances with the rack ID passed with 
>>>> the new StreamsConfig#RACK_ID_CONFIG property. 
>>>> In practice, that would mean that when “rack.id <http://rack.id/>” is 
>>>> configured in Kafka Streams, it will automatically translate into 
>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer clients 
>>>> that is used by Kafka Streams internally.
>>>> 
>>>> [1] 
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>  
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>> 
>>>> P.S 
>>>> I have draft PR ready, if it helps the discussion moving forward, I can 
>>>> provide the draft PR link in this thread.
>>>> 
>>>> Regards, 
>>>> Levani
>> 
> 
> 



Re: [DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-01-28 Thread Levani Kokhreidze
Hi John

Thanks for the feedback (and for the great work on KIP441 :) ). 
Makes sense, will add a section in the KIP explaining rack awarenesses on high 
level and how it’s implemented in the different distributed systems.

Thanks,
Levani

> On 27. Jan 2021, at 16:07, John Roesler  wrote:
> 
> Hi Levani,
> 
> Thanks for this KIP! I think this is really high value; it was something I 
> was disappointed I didn’t get to do as part of KIP-441.
> 
> Rack awareness is a feature provided by other distributed systems as well. I 
> wonder if your KIP could devote a section to summarizing what rack awareness 
> looks like in other distributed systems, to help us put this design in 
> context. 
> 
> Thanks!
> John
> 
> 
> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote:
>> Hello all,
>> 
>> I’d like to start discussion on KIP-708 [1] that aims to introduce rack 
>> aware standby task distribution in Kafka Streams.
>> In addition to changes mentioned in the KIP, I’d like to get some ideas 
>> on additional change I have in mind. 
>> Assuming KIP moves forward, I was wondering if it makes sense to 
>> configure Kafka Streams consumer instances with the rack ID passed with 
>> the new StreamsConfig#RACK_ID_CONFIG property. 
>> In practice, that would mean that when “rack.id <http://rack.id/>” is 
>> configured in Kafka Streams, it will automatically translate into 
>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer clients 
>> that is used by Kafka Streams internally.
>> 
>> [1] 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>  
>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>> 
>> P.S 
>> I have draft PR ready, if it helps the discussion moving forward, I can 
>> provide the draft PR link in this thread.
>> 
>> Regards, 
>> Levani



[DISCUSS] KIP-708: Rack aware Kafka Streams with pluggable StandbyTask assignor

2021-01-26 Thread Levani Kokhreidze
Hello all,

I’d like to start discussion on KIP-708 [1] that aims to introduce rack aware 
standby task distribution in Kafka Streams.
In addition to changes mentioned in the KIP, I’d like to get some ideas on 
additional change I have in mind. 
Assuming KIP moves forward, I was wondering if it makes sense to configure 
Kafka Streams consumer instances with the rack ID passed with the new 
StreamsConfig#RACK_ID_CONFIG property. 
In practice, that would mean that when “rack.id ” is 
configured in Kafka Streams, it will automatically translate into 
ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer clients that is 
used by Kafka Streams internally.

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
 


P.S 
I have draft PR ready, if it helps the discussion moving forward, I can provide 
the draft PR link in this thread.

Regards, 
Levani

[jira] [Created] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)

2020-11-26 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-10772:
-

 Summary: java.lang.IllegalStateException: There are insufficient 
bytes available to read assignment from the sync-group response (actual byte 
size 0)
 Key: KAFKA-10772
 URL: https://issues.apache.org/jira/browse/KAFKA-10772
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Levani Kokhreidze


>From time to time we encounter the following exception that results in Kafka 
>Streams threads dying.
{code:java}
Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | 
stream-client [cluster1-profile-stats-pipeline-client-id] State transition from 
REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | 
streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] 
State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app 
service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- 
[-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream processing 
pipeline: [profile-stats] encountered unrecoverable exception. Thread: 
[cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely dead. 
If all worker threads die, Kafka Streams will be moved to permanent ERROR 
state. Nov 27 00:59:53.682 streaming-app service: prod | streaming-app-2 | 
Stream processing pipeline: [profile-stats] encountered unrecoverable 
exception. Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] 
is completely dead. If all worker threads die, Kafka Streams will be moved to 
permanent ERROR state. java.lang.IllegalStateException: There are insufficient 
bytes available to read assignment from the sync-group response (actual byte 
size 0) , this is not expected; it is possible that the leader's assign 
function is buggy and did not return any assignment for this member, or because 
static member is configured and the protocol is buggy hence did not get the 
assignment for this member at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10686) Pluggable standby tasks assignor for Kafka Streams

2020-11-05 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-10686:
-

 Summary: Pluggable standby tasks assignor for Kafka Streams
 Key: KAFKA-10686
 URL: https://issues.apache.org/jira/browse/KAFKA-10686
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Levani Kokhreidze


In production, Kafka Streams instances often run across different clusters and 
availability zones. In order to guarantee high availability of the Kafka 
Streams deployments, users would need more granular control over which on 
instances standby tasks can be created. 

Idea of this ticket is to expose interface for Kafka Streams which can be 
implemented by users to control where standby tasks can be created.

Kafka Streams can have RackAware assignment as a default implementation that 
will take into account `rack.id` of the application and make sure that standby 
tasks are created on different racks. 

Point of this ticket though is to more flexibility to users on standby task 
creation, in cases where just rack awareness is not enough. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-01 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-10454:
-

 Summary: Kafka Streams Stuck in infinite REBALANCING loop when 
stream <> table join partitions don't match
 Key: KAFKA-10454
 URL: https://issues.apache.org/jira/browse/KAFKA-10454
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Levani Kokhreidze
 Fix For: 2.6.0


TBD



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10375) Restore consumer fails with SSL handshake fail exception

2020-08-08 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-10375:
-

 Summary: Restore consumer fails with SSL handshake fail exception
 Key: KAFKA-10375
 URL: https://issues.apache.org/jira/browse/KAFKA-10375
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Levani Kokhreidze
 Attachments: stacktrace.txt

After upgrading to 2.6, we started getting "SSL handshake fail" exceptions. 
Curios thing is that it seems to affect only restore consumers. For mTLS, we 
use dynamic certificates that are being reloaded automatically every X minutes.

We didn't have any issues with it, up until upgrading 2.6 and other stream 
processing jobs running Kafka 2.4 don't have similar problems.

After restarting the Kafka Streams instance, issue goes away.

 

>From the stacktrace, it's visible that problem is:
{code:java}
Aug 07 10:36:12.478 | Caused by: 
java.security.cert.CertificateExpiredException: NotAfter: Fri Aug 07 07:45:16 
GMT 2020 
{code}
Seems like somehow restore consumer gets stuck with old certificate and it's 
not refreshed.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] 2.6.0 RC0

2020-07-15 Thread Levani Kokhreidze
Hi Randall,

Not sure if it’s intentional but, documentation for Kafka Streams 2.6.0 also 
contains “Streams API changes in 2.7.0” 
https://kafka.apache.org/26/documentation/streams/upgrade-guide 


Also, there seems to be some formatting issue in 2.6.0 section.

Levani


> On Jul 15, 2020, at 1:48 AM, Randall Hauch  wrote:
> 
> Thanks for catching that, Gary. Apologies to all for announcing this before
> pushing the docs, but that's fixed and the following links are working
> (along with the others in my email):
> 
> * https://kafka.apache.org/26/documentation.html
> * https://kafka.apache.org/26/protocol.html
> 
> Randall
> 
> On Tue, Jul 14, 2020 at 4:30 PM Gary Russell  wrote:
> 
>> Docs link [1] is broken.
>> 
>> [1] https://kafka.apache.org/26/documentation.html
>> 
>> 



Re: [VOTE] KIP-629: Use racially neutral terms in our codebase

2020-06-26 Thread Levani Kokhreidze
+1 (non-binding)

Thank you for this initiative.

Levani

> On Jun 26, 2020, at 11:53 AM, Mickael Maison  wrote:
> 
> +1 (binding)
> Thanks for the KIP!
> 
> On Fri, Jun 26, 2020 at 9:51 AM Jorge Esteban Quilcate Otoya
>  wrote:
>> 
>> +1 (non-binding)
>> Thank you Xavier!
>> 
>> On Fri, Jun 26, 2020 at 8:38 AM Bruno Cadonna  wrote:
>> 
>>> +1 (non-binding)
>>> 
>>> On Fri, Jun 26, 2020 at 3:41 AM Jay Kreps  wrote:
 
 +1
 
 On Thu, Jun 25, 2020 at 6:39 PM Bill Bejeck  wrote:
 
> Thanks for this KIP Xavier.
> 
> +1(binding)
> 
> -Bill
> 
> On Thu, Jun 25, 2020 at 9:04 PM Gwen Shapira 
>>> wrote:
> 
>> +1 (binding)
>> 
>> Thank you Xavier!
>> 
>> On Thu, Jun 25, 2020, 3:44 PM Xavier Léauté  wrote:
>> 
>>> Hi Everyone,
>>> 
>>> I would like to initiate the voting process for KIP-629.
>>> 
>>> 
>> 
> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Use+racially+neutral+terms+in+our+codebase
>>> 
>>> Thank you,
>>> Xavier
>>> 
>> 
> 
>>> 



[jira] [Resolved] (KAFKA-9859) kafka-streams-application-reset tool doesn't take into account topics generated by KTable foreign key join operation

2020-05-20 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze resolved KAFKA-9859.
--
Fix Version/s: 2.6.0
   Resolution: Fixed

fixed with PR [https://github.com/apache/kafka/pull/8671]

> kafka-streams-application-reset tool doesn't take into account topics 
> generated by KTable foreign key join operation
> 
>
> Key: KAFKA-9859
> URL: https://issues.apache.org/jira/browse/KAFKA-9859
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>    Reporter: Levani Kokhreidze
>    Assignee: Levani Kokhreidze
>Priority: Major
>  Labels: newbie, newbie++
> Fix For: 2.6.0
>
>
> Steps to reproduce:
>  * Create Kafka Streams application which uses foreign key join operation 
> (without a Named parameter overload)
>  * Stop Kafka streams application
>  * Perform `kafka-topics-list` and verify that foreign key operation internal 
> topics are generated
>  * Use `kafka-streams-application-reset` to perform the cleanup of your kafka 
> streams application: `kafka-streams-application-reset --application-id 
>  --input-topics  --bootstrap-servers 
>  --to-datetime 2019-04-13T00:00:00.000`
>  * Perform `kafka-topics-list` again, you'll see that topics generated by the 
> foreign key operation are still there.
> [kafka-streams-application-reset|#L679-L680]] uses 
> `-subscription-registration-topic` and `-subscription-response-topic` 
> suffixes to match topics generated by the foreign key operation. While in 
> reality, internal topics are generated in this format:
> {code:java}
> -KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION- number>-topic 
> -KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE- number>-topic{code}
> Please note that this problem only happens when `Named` parameter is not 
> used. When named parameter is used, topics are generated with a same pattern 
> as specified in StreamsResetter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-221: Enhance KStream with Connecting Topic Creation and Repartition Hint

2020-05-15 Thread Levani Kokhreidze
Hi Matthias,

Thanks for your thoughts and sorry for taking so long to respond.
For me that makes total sense, so +1 from my side. 
I took the liberty of creating a ticket for it: 
https://issues.apache.org/jira/browse/KAFKA-10003 
<https://issues.apache.org/jira/browse/KAFKA-10003>

Regards,
Levani


> On May 12, 2020, at 8:30 PM, Guozhang Wang  wrote:
> 
> Sounds fair to me; I think as a syntax sugar it is a good to have, but
> sometimes it was "enforced" to be used for repartitioning purposes.
> 
> On Mon, May 11, 2020 at 7:08 PM Matthias J. Sax  wrote:
> 
>> As an afterthought to KIP-221, I am wondering if we should deprecate
>> `KStream#through()`?
>> 
>> The reasoning is that I assume that most people don't want to manage
>> topics manually anyway and thus it might be good to guide users to use
>> repartition(). Furthermore, through() is really just syntactic sugar for
>> to() followed by builder.stream() (thus people don't really loose
>> functionality). So far, through() was very nice to have, especially with
>> PAPI integration in the DSL (users might need to do a manual
>> repartitioning before transform()) however this pattern should be
>> subsumed by repartition().
>> 
>> Reducing the surface area of our API (instead of just enlarging it)
>> might be good.
>> 
>> Thoughts?
>> 
>> 
>> -Matthias
>> 
>> On 4/5/20 9:36 PM, John Roesler wrote:
>>> Thanks for the update, Levani!
>>> -John
>>> 
>>> On Sat, Apr 4, 2020, at 04:36, Levani Kokhreidze wrote:
>>>> Hello,
>>>> 
>>>> Small update regarding this KIP. As per John’s suggestion during the
>>>> code review
>>>> (https://github.com/apache/kafka/pull/7170#discussion_r392608571
>>>> <https://github.com/apache/kafka/pull/7170#discussion_r392608571>)
>>>> we’ve decided to remove KeyValueMapper overloads for the new
>>>> `repartition` operation for the first release of this feature.
>>>> Wiki page has been updated accordingly
>>>> (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
>>> )
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>>> On Aug 1, 2019, at 9:55 AM, Levani Kokhreidze 
>> wrote:
>>>>> 
>>>>> Thank you all!
>>>>> 
>>>>> The vote has been open for ~8 days. KIP has three binding votes (Bill,
>> Guozhang, Matthias) and one non-binding (Sophie) so the KIP vote passes!
>>>>> I’ll mark KIP as accepted and start working on it as soon as possible!
>>>>> 
>>>>> Regards,
>>>>> Levani
>>>>> 
>>>>>> On Aug 1, 2019, at 2:37 AM, Matthias J. Sax 
>> wrote:
>>>>>> 
>>>>>> +1 (binding)
>>>>>> 
>>>>>> On 7/31/19 8:36 AM, Guozhang Wang wrote:
>>>>>>> Thanks for the update! +1 (binding).
>>>>>>> 
>>>>>>> On Tue, Jul 30, 2019 at 11:42 PM Levani Kokhreidze <
>> levani.co...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hello Guozhang,
>>>>>>>> 
>>>>>>>> Thanks for the feedback. That’s an interesting point. To be honest,
>> I
>>>>>>>> totally missed it. I wasn’t aware that there’s `groupBy`
>> possibility on
>>>>>>>> KTable.
>>>>>>>> I don’t see any reasons why not to add same functionality to KTable
>>>>>>>> interface.
>>>>>>>> 
>>>>>>>> I’ve updated the KIP:
>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>> <
>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>> 
>>>>>>>> Please let me know if you have any other questions and/or concerns.
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Levani
>>>>>>>> 
>>>>>>>>> On Jul

[jira] [Created] (KAFKA-10003) Deprecate KStream#through in favor of KStream#repartition

2020-05-15 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-10003:
-

 Summary: Deprecate KStream#through in favor of KStream#repartition
 Key: KAFKA-10003
 URL: https://issues.apache.org/jira/browse/KAFKA-10003
 Project: Kafka
  Issue Type: Task
Reporter: Levani Kokhreidze


After introducing `KStream#repartition` in 
[KIP-221|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint]],
 it makes sense to deprecate `KStream#through` in favor of new operator (see 
voting thread for more context: 
[https://www.mail-archive.com/dev@kafka.apache.org/msg107645.html)|https://www.mail-archive.com/dev@kafka.apache.org/msg107645.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-9859) kafka-streams-application-reset tool doesn't take into account topics generated by KTable foreign key join operation

2020-04-24 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze reopened KAFKA-9859:
--

> kafka-streams-application-reset tool doesn't take into account topics 
> generated by KTable foreign key join operation
> 
>
> Key: KAFKA-9859
> URL: https://issues.apache.org/jira/browse/KAFKA-9859
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>    Reporter: Levani Kokhreidze
>Priority: Major
>
> Steps to reproduce:
>  * Create Kafka Streams application which uses foreign key join operation
>  * Stop Kafka streams application
>  * Perform `kafka-topics-list` and verify that foreign key operation internal 
> topics are generated
>  * Use `kafka-streams-application-reset` to perform the cleanup of your kafka 
> streams application: `kafka-streams-application-reset --application-id 
>  --input-topics  --bootstrap-servers 
>  --to-datetime 2019-04-13T00:00:00.000`
>  * Perform `kafka-topics-list` again, you'll see that topics generated by the 
> foreign key operation are still there.
> `kafka-streams-application-reset` uses `-subscription-registration-topic` and 
> `-subscription-response-topic` suffixes to match topics generated by the 
> foreign key operation. While in reality, internal topics are generated in 
> this format:
>   
> {code:java}
> -KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION- number>-topic 
> -KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE--topic
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9859) kafka-streams-application-reset tool doesn't take into account topics generated by KTable foreign key join operation

2020-04-13 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-9859:


 Summary: kafka-streams-application-reset tool doesn't take into 
account topics generated by KTable foreign key join operation
 Key: KAFKA-9859
 URL: https://issues.apache.org/jira/browse/KAFKA-9859
 Project: Kafka
  Issue Type: Bug
Reporter: Levani Kokhreidze


Steps to reproduce:
 * Create Kafka Streams application which uses foreign key join operation
 * Stop Kafka streams application
 * Perform `kafka-topics-list` and verify that foreign key operation internal 
topics are generated
 * Use `kafka-streams-application-reset` to perform the cleanup of your kafka 
streams application: `kafka-streams-application-reset --application-id 
 --input-topics  --bootstrap-servers 
 --to-datetime 2019-04-13T00:00:00.000`
 * Perform `kafka-topics-list` again, you'll see that topics generated by the 
foreign key operation are still there.

`kafka-streams-application-reset` uses `repartition` and `changelog` suffixes 
to determine which topics needs to be deleted, as a result topics generated by 
the foreign key are ignored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9850) Move KStream#repartition operator validation during Topology build process

2020-04-10 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-9850:


 Summary: Move KStream#repartition operator validation during 
Topology build process 
 Key: KAFKA-9850
 URL: https://issues.apache.org/jira/browse/KAFKA-9850
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Levani Kokhreidze


`KStream#repartition` operation performs most of its validation regarding 
joining, co-partitioning, etc after starting Kafka Streams instance. Some parts 
of this validation can be detected much earlier, specifically during topology 
`build()`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9828) Add partition to TestRecord in streams test-utils

2020-04-07 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-9828:


 Summary: Add partition to TestRecord in streams test-utils
 Key: KAFKA-9828
 URL: https://issues.apache.org/jira/browse/KAFKA-9828
 Project: Kafka
  Issue Type: Improvement
  Components: streams-test-utils
Reporter: Levani Kokhreidze


TopologyTestDriver creates `TestRecord` for consumed events. In order to test 
partitioning, when one uses custom partitioner, would be useful if `TestRecord` 
had `partition` field as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-221: Enhance KStream with Connecting Topic Creation and Repartition Hint

2020-04-04 Thread Levani Kokhreidze
Hello,

Small update regarding this KIP. As per John’s suggestion during the code 
review (https://github.com/apache/kafka/pull/7170#discussion_r392608571 
<https://github.com/apache/kafka/pull/7170#discussion_r392608571>) we’ve 
decided to remove KeyValueMapper overloads for the new `repartition` operation 
for the first release of this feature.
Wiki page has been updated accordingly 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint>)

Regards,
Levani

> On Aug 1, 2019, at 9:55 AM, Levani Kokhreidze  wrote:
> 
> Thank you all!
> 
> The vote has been open for ~8 days. KIP has three binding votes (Bill, 
> Guozhang, Matthias) and one non-binding (Sophie) so the KIP vote passes!
> I’ll mark KIP as accepted and start working on it as soon as possible!
> 
> Regards,
> Levani
> 
>> On Aug 1, 2019, at 2:37 AM, Matthias J. Sax  wrote:
>> 
>> +1 (binding)
>> 
>> On 7/31/19 8:36 AM, Guozhang Wang wrote:
>>> Thanks for the update! +1 (binding).
>>> 
>>> On Tue, Jul 30, 2019 at 11:42 PM Levani Kokhreidze 
>>> wrote:
>>> 
>>>> Hello Guozhang,
>>>> 
>>>> Thanks for the feedback. That’s an interesting point. To be honest, I
>>>> totally missed it. I wasn’t aware that there’s `groupBy` possibility on
>>>> KTable.
>>>> I don’t see any reasons why not to add same functionality to KTable
>>>> interface.
>>>> 
>>>> I’ve updated the KIP:
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>> <
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>> 
>>>> Please let me know if you have any other questions and/or concerns.
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>>> On Jul 31, 2019, at 1:24 AM, Guozhang Wang  wrote:
>>>>> 
>>>>> Hello Levani,
>>>>> 
>>>>> Thanks for the KIP! Just got a quick question here about the scope: why
>>>> do
>>>>> we only want this for `KStream`, not `KTable#groupBy` for example?
>>>>> 
>>>>> 
>>>>> Guozhang
>>>>> 
>>>>> 
>>>>> On Tue, Jul 30, 2019 at 1:27 PM Bill Bejeck  wrote:
>>>>> 
>>>>>> Thanks for the KIP Levani.
>>>>>> 
>>>>>> +1 (binding)
>>>>>> 
>>>>>> -Bill
>>>>>> 
>>>>>> On Tue, Jul 30, 2019 at 3:37 PM Levani Kokhreidze <
>>>> levani.co...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hello,
>>>>>>> 
>>>>>>> Still waiting for feedback on this KIP.
>>>>>>> Please let me know if you have any concerns and/or questions.
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Levani
>>>>>>> 
>>>>>>> 
>>>>>>>> On Jul 24, 2019, at 8:20 PM, Sophie Blee-Goldman >>>> 
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> Looks good! Thanks Levani,
>>>>>>>> 
>>>>>>>> +1 (non-binding)
>>>>>>>> 
>>>>>>>> Sophie
>>>>>>>> 
>>>>>>>> On Tue, Jul 23, 2019 at 2:16 PM Levani Kokhreidze <
>>>>>>> levani.co...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hello,
>>>>>>>>> 
>>>>>>>>> I’d like to initialize voting on KIP-221:
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>> <
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>>> 
>>>>>>>>> If there’re any more concerns about the KIP, happy to discuss
>>>> further.
>>>>>>>>> 
>>>>>>>>> Regards,
>>>>>>>>> Levani
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> -- Guozhang
>>>> 
>>>> 
>>> 
>> 
> 



Re: Subject: [VOTE] 2.4.1 RC0

2020-03-09 Thread Levani Kokhreidze
+1 non-binding.

- Built from source
- Ran unit tests. All passed.
- Quickstart passed.

Looking forward upgrading to 2.4.1

Regards,
Levani

On Mon, 9 Mar 2020, 17:11 Sean Glover,  wrote:

> +1 (non-binding).  I built from source and ran the unit test suite
> successfully.
>
> Thanks for running this release.  I'm looking forward to upgrading to
> 2.4.1.
>
> Sean
>
> On Mon, Mar 9, 2020 at 8:07 AM Mickael Maison 
> wrote:
>
> > Thanks for running the release!
> > +1 (binding)
> >
> > - Verified signatures
> > - Built from source
> > - Ran unit tests, all passed
> > - Ran through quickstart steps, all worked
> >
> > On Mon, Mar 9, 2020 at 11:04 AM Tom Bentley  wrote:
> > >
> > > +1 (non-binding)
> > >
> > > Built from source, all unit tests passed.
> > >
> > > Thanks Bill.
> > >
> > > On Mon, Mar 9, 2020 at 3:44 AM Gwen Shapira  wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > Verified signatures, built jars from source, quickstart passed and
> > local
> > > > unit tests all passed.
> > > >
> > > > Thank you for the release Bill!
> > > >
> > > > Gwen Shapira
> > > > Engineering Manager | Confluent
> > > > 650.450.2760 | @gwenshap
> > > > Follow us: Twitter | blog
> > > >
> > > > On Sat, Mar 07, 2020 at 8:15 PM, Vahid Hashemian <
> > > > vahid.hashem...@gmail.com > wrote:
> > > >
> > > > >
> > > > >
> > > > >
> > > > > +1 (binding)
> > > > >
> > > > >
> > > > >
> > > > > Verified signature, built from source, and ran quickstart
> > successfully
> > > > > (using openjdk version "11.0.6"). I also ran unit tests locally
> which
> > > > > resulted in a few flaky tests for which there are already open
> Jiras:
> > > > >
> > > > >
> > > > >
> > > > > ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker
> > > > > ConsumerBounceTest.testCloseDuringRebalance
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> >
> ConsumerBounceTest.testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize
> > > > >
> > PlaintextEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaAssign
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> >
> SaslClientsWithInvalidCredentialsTest.testManualAssignmentConsumerWithAuthenticationFailure
> > > > > SaslMultiMechanismConsumerTest.testCoordinatorFailover
> > > > >
> > > > >
> > > > >
> > > > > Thanks for running the release Bill.
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > > --Vahid
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Mar 6, 2020 at 9:20 AM Colin McCabe < cmccabe@ apache.
> org (
> > > > > cmcc...@apache.org ) > wrote:
> > > > >
> > > > >
> > > > >>
> > > > >>
> > > > >> +1 (binding)
> > > > >>
> > > > >>
> > > > >>
> > > > >> Checked the git hash and branch, looked at the docs a bit. Ran
> > > > quickstart
> > > > >> (although not the connect or streams parts). Looks good.
> > > > >>
> > > > >>
> > > > >>
> > > > >> best,
> > > > >> Colin
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Fri, Mar 6, 2020, at 07:31, David Arthur wrote:
> > > > >>
> > > > >>
> > > > >>>
> > > > >>>
> > > > >>> +1 (binding)
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> Download kafka_2.13-2.4.1 and verified signature, ran quickstart,
> > > > >>> everything looks good.
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> Thanks for running this release, Bill!
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> -David
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On Wed, Mar 4, 2020 at 6:06 AM Eno Thereska < eno. thereska@
> > gmail.
> > > > com (
> > > > >>> eno.there...@gmail.com ) >
> > > > >>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >>
> > > > >> wrote:
> > > > >>
> > > > >>
> > > > >>>
> > > > 
> > > > 
> > > >  Hi Bill,
> > > > 
> > > > 
> > > > 
> > > >  I built from source and ran unit and integration tests. They
> > passed.
> > > > There
> > > >  was a large number of skipped tests, but I'm assuming that is
> > > > intentional.
> > > > 
> > > > 
> > > > 
> > > > 
> > > >  Cheers
> > > >  Eno
> > > > 
> > > > 
> > > > 
> > > >  On Tue, Mar 3, 2020 at 8:42 PM Eric Lalonde < eric@ autonomic.
> > ai (
> > > >  e...@autonomic.ai ) > wrote:
> > > > 
> > > > 
> > > > >
> > > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > >
> > > > > I ran:
> > > > > $
> > > > >
> > > > >
> > > > 
> > > > 
> > > > >>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >>
> > > > >> https:/ / github. com/ elalonde/ kafka/ blob/ master/ bin/
> > > > verify-kafka-rc.
> > > > >> sh (
> > > > https://github.com/elalonde/kafka/blob/master/bin/verify-kafka-rc.sh
> )
> > > > >>
> > > > >>
> > > > >>
> > > > >>>
> > > > 
> > > > 
> > > >  < https:/ / github. com/ elalonde/ kafka/ blob/ master/ bin/
> > > > verify-kafka-rc.
> > > >  sh (
> > > > https://github.com/elalonde/kafka/blob/master/bin/verify-kafka-rc.sh
> )
> > > >  >
> > > >  2.4.1 https:/ / home. apache. org/ ~bbejeck/ kafka-2. 4. 1-rc0 (
> > > >  

[jira] [Created] (KAFKA-9638) Do not trigger REBALANCING when specific exceptions occur in Kafka Streams

2020-03-03 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-9638:


 Summary: Do not trigger REBALANCING when specific exceptions occur 
in Kafka Streams 
 Key: KAFKA-9638
 URL: https://issues.apache.org/jira/browse/KAFKA-9638
 Project: Kafka
  Issue Type: New Feature
Reporter: Levani Kokhreidze


As of now, when StreamThread encounters exception in Kafka Streams application, 
it will result in REBALANCING on all the tasks that were responsibility of the 
given thread. Problem with that is, if the exception was, lets say some logical 
exception, like NPE, REBALANCING is pretty much useless cause all other threads 
will also die with the same NPE. This kind of mute rebalancing gives extra 
costs in terms of network traffic, IOPS, etc in case of large stateful 
applications.

In addition, this behaviour causes global outage of the Kafka Streams 
application, instead of localized outage of the certain tasks. Would be great 
if Kafka Streams users could specify through some interface exceptions that 
must not trigger rebalancing of the tasks. StreamThread may still die, but in 
this case, we would have isolated incident.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Issues with triggering the build

2020-01-14 Thread Levani Kokhreidze
Hi Bruno,

Thanks for trying. I’ve also tried with “retest this please” but didn’t do any 
good.
Seems like it affects other PRs as well.

- Levani

> On Jan 14, 2020, at 12:13 PM, Bruno Cadonna  wrote:
> 
> Hi,
> 
> I tried with "Retest this, please" but it didn't work.
> 
> Best,
> Bruno
> 
> On Tue, Jan 14, 2020 at 9:01 AM Levani Kokhreidze
>  wrote:
>> 
>> Hello,
>> 
>> Seems like there’re issues with triggering Jenkins builds. Latest commits 
>> for my PR doesn’t trigger any of the builds.
>> Any ideas how to fix the issue? Here’s the example PR: 
>> https://github.com/apache/kafka/pull/7170 
>> <https://github.com/apache/kafka/pull/7170>
>> 
>> - Levani



Issues with triggering the build

2020-01-14 Thread Levani Kokhreidze
Hello,

Seems like there’re issues with triggering Jenkins builds. Latest commits for 
my PR doesn’t trigger any of the builds.
Any ideas how to fix the issue? Here’s the example PR: 
https://github.com/apache/kafka/pull/7170 


- Levani

[jira] [Created] (KAFKA-9342) Consider making all Kafka Streams DSL configuration classes immutable

2019-12-29 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-9342:


 Summary: Consider making all Kafka Streams DSL configuration 
classes immutable
 Key: KAFKA-9342
 URL: https://issues.apache.org/jira/browse/KAFKA-9342
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Levani Kokhreidze


Currently, Kafka Streams DSL config classes are mix of mutable 
_org.apache.kafka.streams.kstream.Consumed, 
org.apache.kafka.streams.kstream.Materialized_ and immutable 
_org.apache.kafka.streams.kstream.Joined, 
org.apache.kafka.streams.kstream.Grouped_ classes.

Consider unifying all config classes of the DSL operations and make them 
immutable. Backward compatibility should be taken into account when making 
config classes immutable. For example, things may break for user if he or she 
has the code similar to this:

 
{code:java}
final Materialized materialized = Materialized.as("my-store");

if (someCondition()) {
  materialized.withCachingDisabled();
}{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9240) Flaky test kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker

2019-11-27 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-9240:


 Summary: Flaky test 
kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker
 Key: KAFKA-9240
 URL: https://issues.apache.org/jira/browse/KAFKA-9240
 Project: Kafka
  Issue Type: Bug
Reporter: Levani Kokhreidze


{code:java}
Error Messageorg.scalatest.exceptions.TestFailedException: Partition should 
have been moved to the expected log 
directoryStacktraceorg.scalatest.exceptions.TestFailedException: Partition 
should have been moved to the expected log directory at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at 
org.scalatest.Assertions.fail(Assertions.scala:1091) at 
org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker(ReassignPartitionsClusterTest.scala:176)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:412) at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
 at jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
 at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
 at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
 at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-11-16 Thread Levani Kokhreidze
n "maybe execute with this degree
>>>> of parallelism".
>>>> 
>>>> I do think (and I appreciate that this is where Sophie's example is
>>>> coming from) that Streams should strive to be absolutely as simple and
>>>> intuitive as possible (while still maintaining correctness). Optimal
>>>> performance can be at odds with API simplicity. For example, the
>>>> simplest behavior is, if you ask for 5 partitions, you get 5
>>>> partitions. Maybe a repartition is technically not necessary (if you
>>>> didn't change the key), but at least there's no mystery to this
>>>> behavior.
>>>> 
>>>> Clearly, an (opposing) tenent of simplicity is trying to prevent
>>>> people from making mistakes, which I think is what the example boils
>>>> down to. Sometimes, we can prevent clear mistakes, like equi-joining
>>>> two topics with different partition counts. But for this case, it
>>>> doesn't seem as clear-cut to be able to assume that they _said_ 5
>>>> partitions, but they didn't really _want_ 5 partitions. Maybe we can
>>>> just try to be clear in the documentation, and also even log a warning
>>>> when we parse the topology, "hey, I've been asked to repartition this
>>>> stream, but it's not necessary".
>>>> 
>>>> If anything, this discussion really supports to me the value in just
>>>> sticking with `repartition()` for now, and deferring
>>>> `groupBy[Key](partitions)` to the future.
>>>> 
>>>>> Users should not have to choose between allowing Streams to optimize the
>>>> repartition placement, and allowing to specify a number of partitions.
>>>> 
>>>> This is a very fair point, and it may be something that we rapidly
>>>> return to, but it seems safe for now to introduce the non-optimizable
>>>> `reparition()` only, and then consider optimization options later.
>>>> Skipping available optimizations will never break correctness, but
>>>> adding optimizations can, so it makes sense to treat them with
>>>> caution.
>>>> 
>>>> In conclusion, I do think that a use _could_ want to "maybe specify"
>>>> the partition count, but I also think we can afford to pass on
>>>> supporting this right now.
>>>> 
>>>> I'm open to continuing the discussion, but just to avoid ambiguity, I
>>>> still feel we should _not_ change the groupBy[Key] operation at all,
>>>> and we should only add `repartition()` as a non-optimizable operation.
>>>> 
>>>> Thanks all,
>>>> -John
>>>> 
>>>> On Fri, Nov 15, 2019 at 11:26 AM Levani Kokhreidze
>>>>  wrote:
>>>>> 
>>>>> Hello,
>>>>> 
>>>>> Just fyi, PR was updated and now it incorporates the latest suggestions
>>>> about joins.
>>>>> `CopartitionedTopicsEnforcer` will throw an exception if number of
>>>> partitions aren’t the same when using `repartition` operation along with
>>>> `join`.
>>>>> 
>>>>> For more details please take a look at the PR:
>>>> https://github.com/apache/kafka/pull/7170/files <
>>>> https://github.com/apache/kafka/pull/7170/files>
>>>>> 
>>>>> Regards,
>>>>> Levani
>>>>> 
>>>>> 
>>>>>> On Nov 15, 2019, at 11:01 AM, Matthias J. Sax 
>>>> wrote:
>>>>>> 
>>>>>> Thanks a lot for the input Sophie.
>>>>>> 
>>>>>> Your example is quite useful, and I would use it to support my claim
>>>>>> that a "partition hint" for `Grouped` seems "useless" and does not
>>>>>> improve the user experience.
>>>>>> 
>>>>>> 1) You argue that a new user would be worries about repartitions topics
>>>>>> with too many paritions. This would imply that a user is already
>>>>>> advanced enough to understand the implication of repartitioning -- for
>>>>>> this case, I would argue that a user also understand _when_ a
>>>>>> auto-repartitioning would happen and thus the users understands where
>>>> to
>>>>>> insert a `repartition()` operation.
>>>>>> 
>>>>>> 2) For specifying Serdes: if a `groupByKey()` does not trigger
>>>>>

[jira] [Created] (KAFKA-9197) Consider introducing numberOfPartitions configuration field to Grouped configuration class

2019-11-16 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-9197:


 Summary: Consider introducing numberOfPartitions configuration 
field to Grouped configuration class
 Key: KAFKA-9197
 URL: https://issues.apache.org/jira/browse/KAFKA-9197
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Levani Kokhreidze


In the 
[KIP-221|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint]]
 there was an idea of introducing number of partitions field to Grouped config 
class. During the discussion in the mailing list, couple of valid concerns were 
raised against this approach. 

Main argument against it was that, whenever user specifies number of partitions 
for internal, repartition topics, he/she really cares that those configurations 
will be applied. Case with group by is that, repartitioning will not happen, if 
key changing operation isn't performed. Therefore, number of partitions 
configuration specified by the user will never be applied. Alternatively, if 
user cares about manual repartitioning, one may do following in order to scale 
up/down sub topologies:

 
{code:java}
builder
  .stream("topic")
  .repartition((key, value) -> value.newKey(), 
Repartitioned.withNumberOfPartitions(5))   
  .groupByKey()   
  .count();
{code}
 

On the other hand, there were other valid arguments for adding 
numberOfPartitions field to Grouped config class. It was raised in the mailing 
list that, we should treat `numberOfPartitions` field as "desired" number of 
partitions specified by the user, so that _if repartitioning is required_, 
Kafka Streams must use value specified in there.

 

Idea of this ticket is to follow-up on this discussion and implement this 
feature if there's an actual need from the Kafka Streams users.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-11-15 Thread Levani Kokhreidze
t;> thing we do now... check at runtime that the partition counts on both
>>>> sides match and throw an exception otherwise. What this means as a
>>>> user is that if you explicitly repartition the left side to 100
>>>> partitions, and then join with the right side at 10 partitions, you
>>>> get an exception, since this operation is not possible. You'd either
>>>> have to "step down" the left side again, back to 10 partitions, or you
>>>> could repartition the right side to 100 partitions before the join.
>>>> The choice has to be the user's, since it depends on their desired
>>>> execution parallelism.
>>>> 
>>>> Thanks,
>>>> -John
>>>> 
>>>> On Thu, Nov 14, 2019 at 12:55 AM Matthias J. Sax 
>>> wrote:
>>>>> 
>>>>> Thanks a lot John. I think the way you decompose the operators is super
>>>>> helpful for this discussion.
>>>>> 
>>>>> What you suggest with regard to using `Grouped` and enforcing
>>>>> repartitioning if the number of partitions is specified is certainly
>>>>> possible. However, I am not sure if we _should_ do this. My reasoning is
>>>>> that an enforce repartitioning as introduced via `repartition()` is an
>>>>> expensive operations, and it seems better to demand an more explicit
>>>>> user opt-in to trigger it. Just setting an optional parameter might be
>>>>> too subtle to trigger such a heavy "side effect".
>>>>> 
>>>>> While I agree about "usability" in general, I would prefer a more
>>>>> conservative appraoch to introduce this feature, see how it goes, and
>>>>> maybe make it more advance later on. This also applies to what
>>>>> optimzation we may or may not allow (or are able to perform at all).
>>>>> 
>>>>> @Levani: Reflecting about my suggestion about `Repartioned extends
>>>>> Grouped`, I agree that it might not be a good idea.
>>>>> 
>>>>> Atm, I see an enforces repartitioning as non-optimizable and as a good
>>>>> first step and I would suggest to not intoruce anything else for now.
>>>>> Introducing optimizable enforce repartitioning via `groupBy(...,
>>>>> Grouped)` is something we could add later.
>>>>> 
>>>>> 
>>>>> Therefore, I would not change `Grouped` but only introduce
>>>>> `repartition()`. Users that use `grouBy()` atm, and want to opt-in to
>>>>> set the number of partitions, would need to rewrite their code to
>>>>> `selectKey(...).repartition(...).groupByKey()`. It's less convinient but
>>>>> also less risky from an API and optimization point of view.
>>>>> 
>>>>> 
>>>>> @Levani: about joins -> yes, we will need to check the specified number
>>>>> of partitions (if any) and if they don't match, throw an exception. We
>>>>> can discuss this on the PR -- I am just trying to get the PR for KIP-466
>>>>> merged -- your is next on the list :)
>>>>> 
>>>>> 
>>>>> Thoughts?
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> On 11/12/19 4:51 PM, Levani Kokhreidze wrote:
>>>>>> Thank you all for an interesting discussion. This is very enlightening.
>>>>>> 
>>>>>> Thank you Matthias for your explanation. Your arguments are very true.
>>> It makes sense that if user specifies number of partitions he/she really
>>> cares that those specifications are applied to internal topics.
>>>>>> Unfortunately, in current implementation this is not true during
>>> `join` operation. As I’ve written in the PR comment, currently, when
>>> `Stream#join` is used, `CopartitionedTopicsEnforcer` chooses max number of
>>> partitions from the two source topics.
>>>>>> I’m not really sure what would be the other way around this situation.
>>> Maybe fail the stream altogether and inform the user to specify same number
>>> of partitions?
>>>>>> Or we should treat join operations in a same way as it is right now
>>> and basically choose max number of partitions even when `repartition`
>>> operation is specified, because Kafka Streams “knows the best” how to
>>> handle joins?
>>>>>> You can check integra

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-11-12 Thread Levani Kokhreidze
the optimizer should never ignore the repartition operation.
>>>> As a "consequence" (because repartitioning is expensive) a user should
>>>> make an explicit call to `repartition()` IMHO -- piggybacking an
>>>> enforced repartitioning into `groupByKey()` seems to be "dangerous"
>>>> because it might be too subtle and an "optional scaling out" as laid out
>>>> above does not make sense IMHO.
>>>> 
>>>> I am also not worried about "over repartitioning" because the result
>>>> stream would never trigger auto-repartitioning. Only if multiple
>>>> consecutive calls to `repartition()` are made it could be bad -- but
>>>> that's the same with `through()`. In the end, there is always some
>>>> responsibility on the user.
>>>> 
>>>> Btw, for `.groupBy()` we know that repartitioning will be required,
>>>> however, for `groupByKey()` it depends if the KStream is marked as
>>>> `repartitioningRequired`.
>>>> 
>>>> Hence, for `groupByKey()` it should not be possible for a user to set
>>>> number of partitions IMHO. For `groupBy()` it's a different story,
>>>> because calling
>>>> 
>>>>   `repartition().groupBy()`
>>>> 
>>>> does not achieve what we want. Hence, allowing users to pass in the
>>>> number of users partitions into `groupBy()` does actually makes sense,
>>>> because repartitioning will happen anyway and thus we can piggyback a
>>>> scaling decision.
>>>> 
>>>> I think that John has a fair concern about the overloads, however, I am
>>>> not convinced that using `Grouped` to specify the number of partitions
>>>> is intuitive. I double checked `Grouped` and `Repartitioned` and both
>>>> allow to specify a `name` and `keySerde/valueSerde`. Thus, I am
>>>> wondering if we could bridge the gap between both, if we would make
>>>> `Repartitioned extends Grouped`? For this case, we only need
>>>> `groupBy(Grouped)` and a user can pass in both types what seems to make
>>>> the API quite smooth:
>>>> 
>>>>  `stream.groupBy(..., Grouped...)`
>>>> 
>>>>  `stream.groupBy(..., Repartitioned...)`
>>>> 
>>>> 
>>>> Thoughts?
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> 
>>>> 
>>>> On 11/7/19 10:59 AM, Levani Kokhreidze wrote:
>>>>> Hi Sophie,
>>>>> 
>>>>> Thank you for your reply, very insightful. Looking forward hearing others 
>>>>> opinion as well on this.
>>>>> 
>>>>> Kind regards,
>>>>> Levani
>>>>> 
>>>>> 
>>>>>> On Nov 6, 2019, at 1:30 AM, Sophie Blee-Goldman  
>>>>>> wrote:
>>>>>> 
>>>>>>> Personally, I think Matthias’s concern is valid, but on the other hand
>>>>>> Kafka Streams has already
>>>>>>> optimizer in place which alters topology independently from user
>>>>>> 
>>>>>> I agree (with you) and think this is a good way to put it -- we currently
>>>>>> auto-repartition for the user so
>>>>>> that they don't have to walk through their entire topology and reason 
>>>>>> about
>>>>>> when and where to place a
>>>>>> `.through` (or the new `.repartition`), so why suddenly force this onto 
>>>>>> the
>>>>>> user? How certain are we that
>>>>>> users will always get this right? It's easy to imagine that during
>>>>>> development, you write your new app with
>>>>>> correctly placed repartitions in order to use this new feature. During 
>>>>>> the
>>>>>> course of development you end up
>>>>>> tweaking the topology, but don't remember to review or move the
>>>>>> repartitioning since you're used to Streams
>>>>>> doing this for you. If you use only single-partition topics for testing,
>>>>>> you might not even notice your app is
>>>>>> spitting out incorrect results!
>>>>>> 
>>>>>> Anyways, I feel pretty strongly that it would be weird to introduce a new
>>>>>> feature and say that to use it, you can't take
>>>>>> advantage of this other feature anymore. Al

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-11-07 Thread Levani Kokhreidze
Hi Sophie,

Thank you for your reply, very insightful. Looking forward hearing others 
opinion as well on this.

Kind regards,
Levani


> On Nov 6, 2019, at 1:30 AM, Sophie Blee-Goldman  wrote:
> 
>> Personally, I think Matthias’s concern is valid, but on the other hand
> Kafka Streams has already
>> optimizer in place which alters topology independently from user
> 
> I agree (with you) and think this is a good way to put it -- we currently
> auto-repartition for the user so
> that they don't have to walk through their entire topology and reason about
> when and where to place a
> `.through` (or the new `.repartition`), so why suddenly force this onto the
> user? How certain are we that
> users will always get this right? It's easy to imagine that during
> development, you write your new app with
> correctly placed repartitions in order to use this new feature. During the
> course of development you end up
> tweaking the topology, but don't remember to review or move the
> repartitioning since you're used to Streams
> doing this for you. If you use only single-partition topics for testing,
> you might not even notice your app is
> spitting out incorrect results!
> 
> Anyways, I feel pretty strongly that it would be weird to introduce a new
> feature and say that to use it, you can't take
> advantage of this other feature anymore. Also, is it possible our
> optimization framework could ever include an
> optimized repartitioning strategy that is better than what a user could
> achieve by manually inserting repartitions?
> Do we expect users to have a deep understanding of the best way to
> repartition their particular topology, or is it
> likely they will end up over-repartitioning either due to missed
> optimizations or unnecessary extra repartitions?
> I think many users would prefer to just say "if there *is* a repartition
> required at this point in the topology, it should
> have N partitions"
> 
> As to the idea of adding `numberOfPartitions` to Grouped rather than
> adding a new parameter to groupBy, that does seem more in line with the
> current syntax so +1 from me
> 
> On Tue, Nov 5, 2019 at 2:07 PM Levani Kokhreidze 
> wrote:
> 
>> Hello all,
>> 
>> While https://github.com/apache/kafka/pull/7170 <
>> https://github.com/apache/kafka/pull/7170> is under review and it’s
>> almost done, I want to resurrect discussion about this KIP to address
>> couple of concerns raised by Matthias and John.
>> 
>> As a reminder, idea of the KIP-221 was to allow DSL users control over
>> repartitioning and parallelism of sub-topologies by:
>> 1) Introducing new KStream#repartition operation which is done in
>> https://github.com/apache/kafka/pull/7170 <
>> https://github.com/apache/kafka/pull/7170>
>> 2) Add new KStream#groupBy(Repartitioned) operation, which is planned to
>> be separate PR.
>> 
>> While all agree about general implementation and idea behind
>> https://github.com/apache/kafka/pull/7170 <
>> https://github.com/apache/kafka/pull/7170> PR, introducing new
>> KStream#groupBy(Repartitioned) method overload raised some questions during
>> the review.
>> Matthias raised concern that there can be cases when user uses
>> `KStream#groupBy(Repartitioned)` operation, but actual repartitioning may
>> not required, thus configuration passed via `Repartitioned` would never be
>> applied (Matthias, please correct me if I misinterpreted your comment).
>> So instead, if user wants to control parallelism of sub-topologies, he or
>> she should always use `KStream#repartition` operation before groupBy. Full
>> comment can be seen here:
>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125 <
>> https://github.com/apache/kafka/pull/7170#issuecomment-519303125>
>> 
>> On the same topic, John pointed out that, from API design perspective, we
>> shouldn’t intertwine configuration classes of different operators between
>> one another. So instead of introducing new `KStream#groupBy(Repartitioned)`
>> for specifying number of partitions for internal topic, we should update
>> existing `Grouped` class with `numberOfPartitions` field.
>> 
>> Personally, I think Matthias’s concern is valid, but on the other hand
>> Kafka Streams has already optimizer in place which alters topology
>> independently from user. So maybe it makes sense if Kafka Streams,
>> internally would optimize topology in the best way possible, even if in
>> some cases this means ignoring some operator configurations passed by the
>> user. Also, I agree with John about API design semantics. If we go through
>> wit

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-11-05 Thread Levani Kokhreidze
Hello all,

While https://github.com/apache/kafka/pull/7170 
<https://github.com/apache/kafka/pull/7170> is under review and it’s almost 
done, I want to resurrect discussion about this KIP to address couple of 
concerns raised by Matthias and John.

As a reminder, idea of the KIP-221 was to allow DSL users control over 
repartitioning and parallelism of sub-topologies by:
1) Introducing new KStream#repartition operation which is done in 
https://github.com/apache/kafka/pull/7170 
<https://github.com/apache/kafka/pull/7170> 
2) Add new KStream#groupBy(Repartitioned) operation, which is planned to be 
separate PR.

While all agree about general implementation and idea behind 
https://github.com/apache/kafka/pull/7170 
<https://github.com/apache/kafka/pull/7170> PR, introducing new 
KStream#groupBy(Repartitioned) method overload raised some questions during the 
review.
Matthias raised concern that there can be cases when user uses 
`KStream#groupBy(Repartitioned)` operation, but actual repartitioning may not 
required, thus configuration passed via `Repartitioned` would never be applied 
(Matthias, please correct me if I misinterpreted your comment). 
So instead, if user wants to control parallelism of sub-topologies, he or she 
should always use `KStream#repartition` operation before groupBy. Full comment 
can be seen here: 
https://github.com/apache/kafka/pull/7170#issuecomment-519303125 
<https://github.com/apache/kafka/pull/7170#issuecomment-519303125> 

On the same topic, John pointed out that, from API design perspective, we 
shouldn’t intertwine configuration classes of different operators between one 
another. So instead of introducing new `KStream#groupBy(Repartitioned)` for 
specifying number of partitions for internal topic, we should update existing 
`Grouped` class with `numberOfPartitions` field.

Personally, I think Matthias’s concern is valid, but on the other hand Kafka 
Streams has already optimizer in place which alters topology independently from 
user. So maybe it makes sense if Kafka Streams, internally would optimize 
topology in the best way possible, even if in some cases this means ignoring 
some operator configurations passed by the user. Also, I agree with John about 
API design semantics. If we go through with the changes for `KStream#groupBy` 
operation, it makes more sense to add `numberOfPartitions` field to `Grouped` 
class instead of introducing new `KStream#groupBy(Repartitioned)` method 
overload. 

I would really appreciate communities feedback on this.

Kind regards,
Levani



> On Oct 17, 2019, at 12:57 AM, Sophie Blee-Goldman  wrote:
> 
> Hey Levani,
> 
> I think people are busy with the upcoming 2.4 release, and don't have much
> spare time at the
> moment. It's kind of a difficult time to get attention on things, but feel
> free to pick up something else
> to work on in the meantime until things have calmed down a bit!
> 
> Cheers,
> Sophie
> 
> 
> On Wed, Oct 16, 2019 at 11:26 AM Levani Kokhreidze  <mailto:levani.co...@gmail.com>>
> wrote:
> 
>> Hello all,
>> 
>> Sorry for bringing this thread again, but I would like to get some
>> attention on this PR: https://github.com/apache/kafka/pull/7170 
>> <https://github.com/apache/kafka/pull/7170> <
>> https://github.com/apache/kafka/pull/7170 
>> <https://github.com/apache/kafka/pull/7170>>
>> It's been a while now and I would love to move on to other KIPs as well.
>> Please let me know if you have any concerns.
>> 
>> Regards,
>> Levani
>> 
>> 
>>> On Jul 26, 2019, at 11:25 AM, Levani Kokhreidze >> <mailto:levani.co...@gmail.com>>
>> wrote:
>>> 
>>> Hi all,
>>> 
>>> Here’s voting thread for this KIP:
>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html 
>> <https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html> <
>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html 
>> <https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html>>
>>> 
>>> Regards,
>>> Levani
>>> 
>>>> On Jul 24, 2019, at 11:15 PM, Levani Kokhreidze >>> <mailto:levani.co...@gmail.com>
>> <mailto:levani.co...@gmail.com <mailto:levani.co...@gmail.com>>> wrote:
>>>> 
>>>> Hi Matthias,
>>>> 
>>>> Thanks for the suggestion. I Don’t have strong opinion on that one.
>>>> Agree that avoiding unnecessary method overloads is a good idea.
>>>> 
>>>> Updated KIP
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>> 
>>>>> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax >>>> <mailto:matth

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-10-16 Thread Levani Kokhreidze
Hello all,

Sorry for bringing this thread again, but I would like to get some attention on 
this PR: https://github.com/apache/kafka/pull/7170 
<https://github.com/apache/kafka/pull/7170> 
It's been a while now and I would love to move on to other KIPs as well. Please 
let me know if you have any concerns.

Regards,
Levani


> On Jul 26, 2019, at 11:25 AM, Levani Kokhreidze  
> wrote:
> 
> Hi all,
> 
> Here’s voting thread for this KIP: 
> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html 
> <https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html>
> 
> Regards,
> Levani
> 
>> On Jul 24, 2019, at 11:15 PM, Levani Kokhreidze > <mailto:levani.co...@gmail.com>> wrote:
>> 
>> Hi Matthias,
>> 
>> Thanks for the suggestion. I Don’t have strong opinion on that one.
>> Agree that avoiding unnecessary method overloads is a good idea.
>> 
>> Updated KIP
>> 
>> Regards,
>> Levani
>> 
>> 
>>> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax >> <mailto:matth...@confluent.io>> wrote:
>>> 
>>> One question:
>>> 
>>> Why do we add
>>> 
>>>> Repartitioned#with(final String name, final int numberOfPartitions)
>>> 
>>> It seems that `#with(String name)`, `#numberOfPartitions(int)` in
>>> combination with `withName()` and `withNumberOfPartitions()` should be
>>> sufficient. Users can chain the method calls.
>>> 
>>> (I think it's valuable to keep the number of overload small if possible.)
>>> 
>>> Otherwise LGTM.
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 7/23/19 2:18 PM, Levani Kokhreidze wrote:
>>>> Hello,
>>>> 
>>>> Thanks all for your feedback.
>>>> I started voting procedure for this KIP. If there’re any other concerns 
>>>> about this KIP, please let me know.
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze >>>> <mailto:levani.co...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Matthias,
>>>>> 
>>>>> Thanks for the suggestion, makes sense.
>>>>> I’ve updated KIP 
>>>>> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>  
>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>
>>>>>  
>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>  
>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>>).
>>>>> 
>>>>> Regards,
>>>>> Levani
>>>>> 
>>>>> 
>>>>>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax >>>>> <mailto:matth...@confluent.io> <mailto:matth...@confluent.io 
>>>>>> <mailto:matth...@confluent.io>>> wrote:
>>>>>> 
>>>>>> Thanks for driving the KIP.
>>>>>> 
>>>>>> I agree that users need to be able to specify a partitioning strategy.
>>>>>> 
>>>>>> Sophie raises a fair point about topic configs and producer configs. My
>>>>>> take is, that consider `Repartitioned` as an "extension" to `Produced`,
>>>>>> that adds topic configuration, is a good way to think about it and helps
>>>>>> to keep the API "clean".
>>>>>> 
>>>>>> 
>>>>>> With regard to method names. I would prefer to avoid abbreviations. Can
>>>>>> we rename:
>>>>>> 
>>>>>> `withNumOfPartitions` -> `withNumberOfPartitions`
>>>>>> 
>>>>>> Furthermore, it might be good to add some more `static` methods:
>>>>>> 
>>>>>> - Repartitioned.with(Serde, Serde)
>>>>>> - Repartitioned.withNumberOfPartitions(int)
>>>>>> - Repartitioned.streamPartitioner(StreamPartitioner)
>>>>>> 
>>>>>> 
>>>>>> -Matthias
>>>>>> 
>>>>>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
>>>>>>> Totally

[jira] [Created] (KAFKA-9031) Add possibility to reset Kafka streams application state to specific point in time

2019-10-11 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-9031:


 Summary: Add possibility to reset Kafka streams application state 
to specific point in time
 Key: KAFKA-9031
 URL: https://issues.apache.org/jira/browse/KAFKA-9031
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Levani Kokhreidze


Apache Flink has a feature called 
[savepoints|[https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html]]
 which gives possibility to reset stream processing topology state to specific 
point in time. Similar feature can be useful for Kafka Streams applications as 
well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8871) Allow timestamp manipulation in ValueTransformerWithKey

2019-09-04 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-8871:


 Summary: Allow timestamp manipulation in ValueTransformerWithKey
 Key: KAFKA-8871
 URL: https://issues.apache.org/jira/browse/KAFKA-8871
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Levani Kokhreidze


h3. Motivation

When using `KStream#transform` in Kafka Streams DSL to manipulate the 
timestamp, `KStreamImpl#transform` implementation marks *repartitionRequired* 
as *true,* which isn't necessarily okay when one may just want to manipulate 
with timestamp without affecting the key. It would be great if DSL user could 
manipulate the timestamp in `ValueTransformerWithKey`.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [VOTE] KIP-221: Enhance KStream with Connecting Topic Creation and Repartition Hint

2019-08-01 Thread Levani Kokhreidze
Thank you all!

The vote has been open for ~8 days. KIP has three binding votes (Bill, 
Guozhang, Matthias) and one non-binding (Sophie) so the KIP vote passes!
I’ll mark KIP as accepted and start working on it as soon as possible!

Regards,
Levani

> On Aug 1, 2019, at 2:37 AM, Matthias J. Sax  wrote:
> 
> +1 (binding)
> 
> On 7/31/19 8:36 AM, Guozhang Wang wrote:
>> Thanks for the update! +1 (binding).
>> 
>> On Tue, Jul 30, 2019 at 11:42 PM Levani Kokhreidze 
>> wrote:
>> 
>>> Hello Guozhang,
>>> 
>>> Thanks for the feedback. That’s an interesting point. To be honest, I
>>> totally missed it. I wasn’t aware that there’s `groupBy` possibility on
>>> KTable.
>>> I don’t see any reasons why not to add same functionality to KTable
>>> interface.
>>> 
>>> I’ve updated the KIP:
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
>>> <
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>> 
>>> Please let me know if you have any other questions and/or concerns.
>>> 
>>> Regards,
>>> Levani
>>> 
>>>> On Jul 31, 2019, at 1:24 AM, Guozhang Wang  wrote:
>>>> 
>>>> Hello Levani,
>>>> 
>>>> Thanks for the KIP! Just got a quick question here about the scope: why
>>> do
>>>> we only want this for `KStream`, not `KTable#groupBy` for example?
>>>> 
>>>> 
>>>> Guozhang
>>>> 
>>>> 
>>>> On Tue, Jul 30, 2019 at 1:27 PM Bill Bejeck  wrote:
>>>> 
>>>>> Thanks for the KIP Levani.
>>>>> 
>>>>> +1 (binding)
>>>>> 
>>>>> -Bill
>>>>> 
>>>>> On Tue, Jul 30, 2019 at 3:37 PM Levani Kokhreidze <
>>> levani.co...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hello,
>>>>>> 
>>>>>> Still waiting for feedback on this KIP.
>>>>>> Please let me know if you have any concerns and/or questions.
>>>>>> 
>>>>>> Regards,
>>>>>> Levani
>>>>>> 
>>>>>> 
>>>>>>> On Jul 24, 2019, at 8:20 PM, Sophie Blee-Goldman >>> 
>>>>>> wrote:
>>>>>>> 
>>>>>>> Looks good! Thanks Levani,
>>>>>>> 
>>>>>>> +1 (non-binding)
>>>>>>> 
>>>>>>> Sophie
>>>>>>> 
>>>>>>> On Tue, Jul 23, 2019 at 2:16 PM Levani Kokhreidze <
>>>>>> levani.co...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hello,
>>>>>>>> 
>>>>>>>> I’d like to initialize voting on KIP-221:
>>>>>>>> 
>>>>>> 
>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>> <
>>>>>>>> 
>>>>>> 
>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>>>>> 
>>>>>>>> If there’re any more concerns about the KIP, happy to discuss
>>> further.
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Levani
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> -- Guozhang
>>> 
>>> 
>> 
> 



Re: [VOTE] KIP-221: Enhance KStream with Connecting Topic Creation and Repartition Hint

2019-07-31 Thread Levani Kokhreidze
Hello Guozhang,

Thanks for the feedback. That’s an interesting point. To be honest, I totally 
missed it. I wasn’t aware that there’s `groupBy` possibility on KTable. 
I don’t see any reasons why not to add same functionality to KTable interface.

I’ve updated the KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint>
Please let me know if you have any other questions and/or concerns.

Regards,
Levani

> On Jul 31, 2019, at 1:24 AM, Guozhang Wang  wrote:
> 
> Hello Levani,
> 
> Thanks for the KIP! Just got a quick question here about the scope: why do
> we only want this for `KStream`, not `KTable#groupBy` for example?
> 
> 
> Guozhang
> 
> 
> On Tue, Jul 30, 2019 at 1:27 PM Bill Bejeck  wrote:
> 
>> Thanks for the KIP Levani.
>> 
>> +1 (binding)
>> 
>> -Bill
>> 
>> On Tue, Jul 30, 2019 at 3:37 PM Levani Kokhreidze 
>> wrote:
>> 
>>> Hello,
>>> 
>>> Still waiting for feedback on this KIP.
>>> Please let me know if you have any concerns and/or questions.
>>> 
>>> Regards,
>>> Levani
>>> 
>>> 
>>>> On Jul 24, 2019, at 8:20 PM, Sophie Blee-Goldman 
>>> wrote:
>>>> 
>>>> Looks good! Thanks Levani,
>>>> 
>>>> +1 (non-binding)
>>>> 
>>>> Sophie
>>>> 
>>>> On Tue, Jul 23, 2019 at 2:16 PM Levani Kokhreidze <
>>> levani.co...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hello,
>>>>> 
>>>>> I’d like to initialize voting on KIP-221:
>>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>> <
>>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>> 
>>>>> If there’re any more concerns about the KIP, happy to discuss further.
>>>>> 
>>>>> Regards,
>>>>> Levani
>>> 
>>> 
>> 
> 
> 
> -- 
> -- Guozhang



Re: [VOTE] KIP-221: Enhance KStream with Connecting Topic Creation and Repartition Hint

2019-07-30 Thread Levani Kokhreidze
Hello,

Still waiting for feedback on this KIP.
Please let me know if you have any concerns and/or questions.

Regards,
Levani


> On Jul 24, 2019, at 8:20 PM, Sophie Blee-Goldman  wrote:
> 
> Looks good! Thanks Levani,
> 
> +1 (non-binding)
> 
> Sophie
> 
> On Tue, Jul 23, 2019 at 2:16 PM Levani Kokhreidze 
> wrote:
> 
>> Hello,
>> 
>> I’d like to initialize voting on KIP-221:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>> 
>> If there’re any more concerns about the KIP, happy to discuss further.
>> 
>> Regards,
>> Levani



[jira] [Resolved] (KAFKA-8727) Control over standby tasks host assignment

2019-07-29 Thread Levani Kokhreidze (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze resolved KAFKA-8727.
--
Resolution: Duplicate

 Duplicate of KAFKA-6718

> Control over standby tasks host assignment
> --
>
> Key: KAFKA-8727
> URL: https://issues.apache.org/jira/browse/KAFKA-8727
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>    Reporter: Levani Kokhreidze
>Priority: Minor
>
> *Motivation*
> As of now, Kafka Streams user has no control over to which host Kafka Streams 
> application will create standby task. In production deployments (especially 
> in Kubernetes) it's quite common to have multiple instances of the same Kafka 
> Streams application deployed across more than one "cluster" in order to have 
> high availability of the system.
> For example, if we have 6 Kafka Streams instances deployed across two 
> clusters, we'll get 3 Kafka Streams instances per cluster. With the current 
> implementation, Kafka Streams application may create "standby task" in the 
> same cluster as the active task. This is not the most optimal solution, 
> since, in case of cluster failure recovery time will be much bigger. This is 
> especially problematic for Kafka Streams application that manages large state.
>  
> *Possible Solution*
> It would be great if in the Kafka Streams configuration we could have a 
> possibility to inject dynamic environment variables and use that environment 
> variables to control over where standby task should be created.
> For example, suppose I have active task *1_1* with environment variable: 
> *CLUSTER_ID: main01* then stnadby task for *1_1* should be created where 
> *CLUSTER_ID* *!=* *main01*



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8727) Control over standby tasks host assignment

2019-07-29 Thread Levani Kokhreidze (JIRA)
Levani Kokhreidze created KAFKA-8727:


 Summary: Control over standby tasks host assignment
 Key: KAFKA-8727
 URL: https://issues.apache.org/jira/browse/KAFKA-8727
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Levani Kokhreidze


*Motivation*

As of now, Kafka Streams user has no control over to which host Kafka Streams 
application will create standby task. In production deployments (especially in 
Kubernetes) it's quite common to have multiple instances of the same Kafka 
Streams application deployed across more than one "cluster" in order to have 
high availability of the system.

For example, if we have 6 Kafka Streams instances deployed across two clusters, 
we'll get 3 Kafka Streams instances per cluster. With the current 
implementation, Kafka Streams application may create "standby task" in the same 
cluster as the active task which, is not the most optimal solution, since, in 
case of cluster failure recovery time will be much bigger. This is especially 
problematic for Kafka Streams application that manages large state.

 

*Possible Solution*

**It would be great if in the Kafka Streams configuration we could have a 
possibility to inject dynamic environment variables and use that environment 
variables to control over where standby task should be created.

For example, suppose I have active task *1_1* with environment variable: 
*CLUSTER_ID: main01* then stnadby task for *1_1* should be created where 
*CLUSTER_ID* *!=* *main01*



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-26 Thread Levani Kokhreidze
Hi all,

Here’s voting thread for this KIP: 
https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html 
<https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html>

Regards,
Levani

> On Jul 24, 2019, at 11:15 PM, Levani Kokhreidze  
> wrote:
> 
> Hi Matthias,
> 
> Thanks for the suggestion. I Don’t have strong opinion on that one.
> Agree that avoiding unnecessary method overloads is a good idea.
> 
> Updated KIP
> 
> Regards,
> Levani
> 
> 
>> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax  wrote:
>> 
>> One question:
>> 
>> Why do we add
>> 
>>> Repartitioned#with(final String name, final int numberOfPartitions)
>> 
>> It seems that `#with(String name)`, `#numberOfPartitions(int)` in
>> combination with `withName()` and `withNumberOfPartitions()` should be
>> sufficient. Users can chain the method calls.
>> 
>> (I think it's valuable to keep the number of overload small if possible.)
>> 
>> Otherwise LGTM.
>> 
>> 
>> -Matthias
>> 
>> 
>> On 7/23/19 2:18 PM, Levani Kokhreidze wrote:
>>> Hello,
>>> 
>>> Thanks all for your feedback.
>>> I started voting procedure for this KIP. If there’re any other concerns 
>>> about this KIP, please let me know.
>>> 
>>> Regards,
>>> Levani
>>> 
>>>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze  
>>>> wrote:
>>>> 
>>>> Hi Matthias,
>>>> 
>>>> Thanks for the suggestion, makes sense.
>>>> I’ve updated KIP 
>>>> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>  
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>).
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>> 
>>>>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax >>>> <mailto:matth...@confluent.io>> wrote:
>>>>> 
>>>>> Thanks for driving the KIP.
>>>>> 
>>>>> I agree that users need to be able to specify a partitioning strategy.
>>>>> 
>>>>> Sophie raises a fair point about topic configs and producer configs. My
>>>>> take is, that consider `Repartitioned` as an "extension" to `Produced`,
>>>>> that adds topic configuration, is a good way to think about it and helps
>>>>> to keep the API "clean".
>>>>> 
>>>>> 
>>>>> With regard to method names. I would prefer to avoid abbreviations. Can
>>>>> we rename:
>>>>> 
>>>>> `withNumOfPartitions` -> `withNumberOfPartitions`
>>>>> 
>>>>> Furthermore, it might be good to add some more `static` methods:
>>>>> 
>>>>> - Repartitioned.with(Serde, Serde)
>>>>> - Repartitioned.withNumberOfPartitions(int)
>>>>> - Repartitioned.streamPartitioner(StreamPartitioner)
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
>>>>>> Totally agree. I think in KStream interface it makes sense to have some 
>>>>>> duplicate configurations between operators in order to keep API simple 
>>>>>> and usable.
>>>>>> Also, as more surface API has, harder it is to have proper backward 
>>>>>> compatibility.
>>>>>> While initial idea of keeping topic level configs separate was exciting, 
>>>>>> having Repartitioned class encapsulate some producer level configs makes 
>>>>>> API more readable.
>>>>>> 
>>>>>> Regards,
>>>>>> Levani
>>>>>> 
>>>>>>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman >>>>>> <mailto:sop...@confluent.io>> wrote:
>>>>>>> 
>>>>>>> I think that is a good point about trying to keep producer level
>>>>>>> configurations and (repartition) topic level considerations separate.
>>>>>>> Number of partitions is definitely purely a topic level configuration. 
>>>>>>> But
>>>>>>> on some level, serdes and partitioners are just as much a topic
>>>>>>> configuration as a producer one. You could have two producers c

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-24 Thread Levani Kokhreidze
Hi Matthias,

Thanks for the suggestion. I Don’t have strong opinion on that one.
Agree that avoiding unnecessary method overloads is a good idea.

Updated KIP

Regards,
Levani


> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax  wrote:
> 
> One question:
> 
> Why do we add
> 
>> Repartitioned#with(final String name, final int numberOfPartitions)
> 
> It seems that `#with(String name)`, `#numberOfPartitions(int)` in
> combination with `withName()` and `withNumberOfPartitions()` should be
> sufficient. Users can chain the method calls.
> 
> (I think it's valuable to keep the number of overload small if possible.)
> 
> Otherwise LGTM.
> 
> 
> -Matthias
> 
> 
> On 7/23/19 2:18 PM, Levani Kokhreidze wrote:
>> Hello,
>> 
>> Thanks all for your feedback.
>> I started voting procedure for this KIP. If there’re any other concerns 
>> about this KIP, please let me know.
>> 
>> Regards,
>> Levani
>> 
>>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze  
>>> wrote:
>>> 
>>> Hi Matthias,
>>> 
>>> Thanks for the suggestion, makes sense.
>>> I’ve updated KIP 
>>> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>  
>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>).
>>> 
>>> Regards,
>>> Levani
>>> 
>>> 
>>>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax >>> <mailto:matth...@confluent.io>> wrote:
>>>> 
>>>> Thanks for driving the KIP.
>>>> 
>>>> I agree that users need to be able to specify a partitioning strategy.
>>>> 
>>>> Sophie raises a fair point about topic configs and producer configs. My
>>>> take is, that consider `Repartitioned` as an "extension" to `Produced`,
>>>> that adds topic configuration, is a good way to think about it and helps
>>>> to keep the API "clean".
>>>> 
>>>> 
>>>> With regard to method names. I would prefer to avoid abbreviations. Can
>>>> we rename:
>>>> 
>>>> `withNumOfPartitions` -> `withNumberOfPartitions`
>>>> 
>>>> Furthermore, it might be good to add some more `static` methods:
>>>> 
>>>> - Repartitioned.with(Serde, Serde)
>>>> - Repartitioned.withNumberOfPartitions(int)
>>>> - Repartitioned.streamPartitioner(StreamPartitioner)
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
>>>>> Totally agree. I think in KStream interface it makes sense to have some 
>>>>> duplicate configurations between operators in order to keep API simple 
>>>>> and usable.
>>>>> Also, as more surface API has, harder it is to have proper backward 
>>>>> compatibility.
>>>>> While initial idea of keeping topic level configs separate was exciting, 
>>>>> having Repartitioned class encapsulate some producer level configs makes 
>>>>> API more readable.
>>>>> 
>>>>> Regards,
>>>>> Levani
>>>>> 
>>>>>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman >>>>> <mailto:sop...@confluent.io>> wrote:
>>>>>> 
>>>>>> I think that is a good point about trying to keep producer level
>>>>>> configurations and (repartition) topic level considerations separate.
>>>>>> Number of partitions is definitely purely a topic level configuration. 
>>>>>> But
>>>>>> on some level, serdes and partitioners are just as much a topic
>>>>>> configuration as a producer one. You could have two producers configured
>>>>>> with different serdes and/or partitioners, but if they are writing to the
>>>>>> same topic the result would be very difficult to part. So in a sense, 
>>>>>> these
>>>>>> are configurations of topics in Streams, not just producers.
>>>>>> 
>>>>>> Another way to think of it: while the Streams API is not always true to
>>>>>> this, ideally all the relevant configs for an operator are wrapped into a
>>>>>> single object (in this case, Repartitioned). We could instead split out 
>>>>>> the
>>>>>> fields in 

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-23 Thread Levani Kokhreidze
Hello,

Thanks all for your feedback.
I started voting procedure for this KIP. If there’re any other concerns about 
this KIP, please let me know.

Regards,
Levani

> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze  wrote:
> 
> Hi Matthias,
> 
> Thanks for the suggestion, makes sense.
> I’ve updated KIP 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>  
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>).
> 
> Regards,
> Levani
> 
> 
>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax > <mailto:matth...@confluent.io>> wrote:
>> 
>> Thanks for driving the KIP.
>> 
>> I agree that users need to be able to specify a partitioning strategy.
>> 
>> Sophie raises a fair point about topic configs and producer configs. My
>> take is, that consider `Repartitioned` as an "extension" to `Produced`,
>> that adds topic configuration, is a good way to think about it and helps
>> to keep the API "clean".
>> 
>> 
>> With regard to method names. I would prefer to avoid abbreviations. Can
>> we rename:
>> 
>> `withNumOfPartitions` -> `withNumberOfPartitions`
>> 
>> Furthermore, it might be good to add some more `static` methods:
>> 
>> - Repartitioned.with(Serde, Serde)
>> - Repartitioned.withNumberOfPartitions(int)
>> - Repartitioned.streamPartitioner(StreamPartitioner)
>> 
>> 
>> -Matthias
>> 
>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
>>> Totally agree. I think in KStream interface it makes sense to have some 
>>> duplicate configurations between operators in order to keep API simple and 
>>> usable.
>>> Also, as more surface API has, harder it is to have proper backward 
>>> compatibility.
>>> While initial idea of keeping topic level configs separate was exciting, 
>>> having Repartitioned class encapsulate some producer level configs makes 
>>> API more readable.
>>> 
>>> Regards,
>>> Levani
>>> 
>>>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman >>> <mailto:sop...@confluent.io>> wrote:
>>>> 
>>>> I think that is a good point about trying to keep producer level
>>>> configurations and (repartition) topic level considerations separate.
>>>> Number of partitions is definitely purely a topic level configuration. But
>>>> on some level, serdes and partitioners are just as much a topic
>>>> configuration as a producer one. You could have two producers configured
>>>> with different serdes and/or partitioners, but if they are writing to the
>>>> same topic the result would be very difficult to part. So in a sense, these
>>>> are configurations of topics in Streams, not just producers.
>>>> 
>>>> Another way to think of it: while the Streams API is not always true to
>>>> this, ideally all the relevant configs for an operator are wrapped into a
>>>> single object (in this case, Repartitioned). We could instead split out the
>>>> fields in common with Produced into a separate parameter to keep topic and
>>>> producer level configurations separate, but this increases the API surface
>>>> area by a lot. It's much more straightforward to just say "this is
>>>> everything that this particular operator needs" without worrying about what
>>>> exactly you're specifying.
>>>> 
>>>> I suppose you could alternatively make Produced a field of Repartitioned,
>>>> but I don't think we do this kind of composition elsewhere in Streams at
>>>> the moment
>>>> 
>>>> On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze >>> <mailto:levani.co...@gmail.com>>
>>>> wrote:
>>>> 
>>>>> Hi Bill,
>>>>> 
>>>>> Thanks a lot for the feedback.
>>>>> Yes, that makes sense. I’ve updated KIP with `Repartitioned#partitioner`
>>>>> configuration.
>>>>> In the beginning, I wanted to introduce a class for topic level
>>>>> configuration and keep topic level and producer level configurations (such
>>>>> as Produced) separately (see my second email in this thread).
>>>>> But while looking at the semantics of KStream interface, I couldn’t really
>>>>> figure out good operation name for Topic level configuration class and 
>>>>> ju

[VOTE] KIP-221: Enhance KStream with Connecting Topic Creation and Repartition Hint

2019-07-23 Thread Levani Kokhreidze
Hello,

I’d like to initialize voting on KIP-221: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
 

If there’re any more concerns about the KIP, happy to discuss further.

Regards,
Levani

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-20 Thread Levani Kokhreidze
Hi Matthias,

Thanks for the suggestion, makes sense.
I’ve updated KIP 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>).

Regards,
Levani


> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax  wrote:
> 
> Thanks for driving the KIP.
> 
> I agree that users need to be able to specify a partitioning strategy.
> 
> Sophie raises a fair point about topic configs and producer configs. My
> take is, that consider `Repartitioned` as an "extension" to `Produced`,
> that adds topic configuration, is a good way to think about it and helps
> to keep the API "clean".
> 
> 
> With regard to method names. I would prefer to avoid abbreviations. Can
> we rename:
> 
> `withNumOfPartitions` -> `withNumberOfPartitions`
> 
> Furthermore, it might be good to add some more `static` methods:
> 
> - Repartitioned.with(Serde, Serde)
> - Repartitioned.withNumberOfPartitions(int)
> - Repartitioned.streamPartitioner(StreamPartitioner)
> 
> 
> -Matthias
> 
> On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
>> Totally agree. I think in KStream interface it makes sense to have some 
>> duplicate configurations between operators in order to keep API simple and 
>> usable.
>> Also, as more surface API has, harder it is to have proper backward 
>> compatibility.
>> While initial idea of keeping topic level configs separate was exciting, 
>> having Repartitioned class encapsulate some producer level configs makes API 
>> more readable.
>> 
>> Regards,
>> Levani
>> 
>>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman  
>>> wrote:
>>> 
>>> I think that is a good point about trying to keep producer level
>>> configurations and (repartition) topic level considerations separate.
>>> Number of partitions is definitely purely a topic level configuration. But
>>> on some level, serdes and partitioners are just as much a topic
>>> configuration as a producer one. You could have two producers configured
>>> with different serdes and/or partitioners, but if they are writing to the
>>> same topic the result would be very difficult to part. So in a sense, these
>>> are configurations of topics in Streams, not just producers.
>>> 
>>> Another way to think of it: while the Streams API is not always true to
>>> this, ideally all the relevant configs for an operator are wrapped into a
>>> single object (in this case, Repartitioned). We could instead split out the
>>> fields in common with Produced into a separate parameter to keep topic and
>>> producer level configurations separate, but this increases the API surface
>>> area by a lot. It's much more straightforward to just say "this is
>>> everything that this particular operator needs" without worrying about what
>>> exactly you're specifying.
>>> 
>>> I suppose you could alternatively make Produced a field of Repartitioned,
>>> but I don't think we do this kind of composition elsewhere in Streams at
>>> the moment
>>> 
>>> On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze 
>>> wrote:
>>> 
>>>> Hi Bill,
>>>> 
>>>> Thanks a lot for the feedback.
>>>> Yes, that makes sense. I’ve updated KIP with `Repartitioned#partitioner`
>>>> configuration.
>>>> In the beginning, I wanted to introduce a class for topic level
>>>> configuration and keep topic level and producer level configurations (such
>>>> as Produced) separately (see my second email in this thread).
>>>> But while looking at the semantics of KStream interface, I couldn’t really
>>>> figure out good operation name for Topic level configuration class and just
>>>> introducing `Topic` config class was kinda breaking the semantics.
>>>> So I think having Repartitioned class which encapsulates topic and
>>>> producer level configurations for internal topics is viable thing to do.
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>>> On Jul 19, 2019, at 7:47 PM, Bill Bejeck  wrote:
>>>>> 
>>>>> Hi Lavani,
>>>>> 
>>>>> Thanks for resurrecting this KIP.
>>>>> 
>>>>> I'm also a +1 for adding a partition option.  In addition to the reason
>>>>> provided by John, my reasoning is:
>>>>> 

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-19 Thread Levani Kokhreidze
Totally agree. I think in KStream interface it makes sense to have some 
duplicate configurations between operators in order to keep API simple and 
usable.
Also, as more surface API has, harder it is to have proper backward 
compatibility.
While initial idea of keeping topic level configs separate was exciting, having 
Repartitioned class encapsulate some producer level configs makes API more 
readable.

Regards,
Levani

> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman  wrote:
> 
> I think that is a good point about trying to keep producer level
> configurations and (repartition) topic level considerations separate.
> Number of partitions is definitely purely a topic level configuration. But
> on some level, serdes and partitioners are just as much a topic
> configuration as a producer one. You could have two producers configured
> with different serdes and/or partitioners, but if they are writing to the
> same topic the result would be very difficult to part. So in a sense, these
> are configurations of topics in Streams, not just producers.
> 
> Another way to think of it: while the Streams API is not always true to
> this, ideally all the relevant configs for an operator are wrapped into a
> single object (in this case, Repartitioned). We could instead split out the
> fields in common with Produced into a separate parameter to keep topic and
> producer level configurations separate, but this increases the API surface
> area by a lot. It's much more straightforward to just say "this is
> everything that this particular operator needs" without worrying about what
> exactly you're specifying.
> 
> I suppose you could alternatively make Produced a field of Repartitioned,
> but I don't think we do this kind of composition elsewhere in Streams at
> the moment
> 
> On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze 
> wrote:
> 
>> Hi Bill,
>> 
>> Thanks a lot for the feedback.
>> Yes, that makes sense. I’ve updated KIP with `Repartitioned#partitioner`
>> configuration.
>> In the beginning, I wanted to introduce a class for topic level
>> configuration and keep topic level and producer level configurations (such
>> as Produced) separately (see my second email in this thread).
>> But while looking at the semantics of KStream interface, I couldn’t really
>> figure out good operation name for Topic level configuration class and just
>> introducing `Topic` config class was kinda breaking the semantics.
>> So I think having Repartitioned class which encapsulates topic and
>> producer level configurations for internal topics is viable thing to do.
>> 
>> Regards,
>> Levani
>> 
>>> On Jul 19, 2019, at 7:47 PM, Bill Bejeck  wrote:
>>> 
>>> Hi Lavani,
>>> 
>>> Thanks for resurrecting this KIP.
>>> 
>>> I'm also a +1 for adding a partition option.  In addition to the reason
>>> provided by John, my reasoning is:
>>> 
>>>  1. Users may want to use something other than hash-based partitioning
>>>  2. Users may wish to partition on something different than the key
>>>  without having to change the key.  For example:
>>> 1. A combination of fields in the value in conjunction with the key
>>> 2. Something other than the key
>>>  3. We allow users to specify a partitioner on Produced hence in
>>>  KStream.to and KStream.through, so it makes sense for API consistency.
>>> 
>>> Just my  2 cents.
>>> 
>>> Thanks,
>>> Bill
>>> 
>>> 
>>> 
>>> On Fri, Jul 19, 2019 at 5:46 AM Levani Kokhreidze <
>> levani.co...@gmail.com>
>>> wrote:
>>> 
>>>> Hi John,
>>>> 
>>>> In my mind it makes sense.
>>>> If we add partitioner configuration to Repartitioned class, with the
>>>> combination of specifying number of partitions for internal topics, user
>>>> will have opportunity to ensure co-partitioning before join operation.
>>>> I think this can be quite powerful feature.
>>>> Wondering what others think about this?
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>>> On Jul 18, 2019, at 1:20 AM, John Roesler  wrote:
>>>>> 
>>>>> Yes, I believe that's what I had in mind. Again, not totally sure it
>>>>> makes sense, but I believe something similar is the rationale for
>>>>> having the partitioner option in Produced.
>>>>> 
>>>>> Thanks,
>>>>> -John
>>>>> 
>>>>> On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-19 Thread Levani Kokhreidze
Hi Bill,

Thanks a lot for the feedback.
Yes, that makes sense. I’ve updated KIP with `Repartitioned#partitioner` 
configuration.
In the beginning, I wanted to introduce a class for topic level configuration 
and keep topic level and producer level configurations (such as Produced) 
separately (see my second email in this thread).
But while looking at the semantics of KStream interface, I couldn’t really 
figure out good operation name for Topic level configuration class and just 
introducing `Topic` config class was kinda breaking the semantics.
So I think having Repartitioned class which encapsulates topic and producer 
level configurations for internal topics is viable thing to do.

Regards,
Levani

> On Jul 19, 2019, at 7:47 PM, Bill Bejeck  wrote:
> 
> Hi Lavani,
> 
> Thanks for resurrecting this KIP.
> 
> I'm also a +1 for adding a partition option.  In addition to the reason
> provided by John, my reasoning is:
> 
>   1. Users may want to use something other than hash-based partitioning
>   2. Users may wish to partition on something different than the key
>   without having to change the key.  For example:
>  1. A combination of fields in the value in conjunction with the key
>  2. Something other than the key
>   3. We allow users to specify a partitioner on Produced hence in
>   KStream.to and KStream.through, so it makes sense for API consistency.
> 
> Just my  2 cents.
> 
> Thanks,
> Bill
> 
> 
> 
> On Fri, Jul 19, 2019 at 5:46 AM Levani Kokhreidze 
> wrote:
> 
>> Hi John,
>> 
>> In my mind it makes sense.
>> If we add partitioner configuration to Repartitioned class, with the
>> combination of specifying number of partitions for internal topics, user
>> will have opportunity to ensure co-partitioning before join operation.
>> I think this can be quite powerful feature.
>> Wondering what others think about this?
>> 
>> Regards,
>> Levani
>> 
>>> On Jul 18, 2019, at 1:20 AM, John Roesler  wrote:
>>> 
>>> Yes, I believe that's what I had in mind. Again, not totally sure it
>>> makes sense, but I believe something similar is the rationale for
>>> having the partitioner option in Produced.
>>> 
>>> Thanks,
>>> -John
>>> 
>>> On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze
>>>  wrote:
>>>> 
>>>> Hey John,
>>>> 
>>>> Oh that’s interesting use-case.
>>>> Do I understand this correctly, in your example I would first issue
>> repartition(Repartitioned) with proper partitioner that essentially would
>> be the same as the topic I want to join with and then do the KStream#join
>> with DSL?
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>>> On Jul 17, 2019, at 11:11 PM, John Roesler  wrote:
>>>>> 
>>>>> Hey, all, just to chime in,
>>>>> 
>>>>> I think it might be useful to have an option to specify the
>>>>> partitioner. The case I have in mind is that some data may get
>>>>> repartitioned and then joined with an input topic. If the right-side
>>>>> input topic uses a custom partitioning strategy, then the
>>>>> repartitioned stream also needs to be partitioned with the same
>>>>> strategy.
>>>>> 
>>>>> Does that make sense, or did I maybe miss something important?
>>>>> 
>>>>> Thanks,
>>>>> -John
>>>>> 
>>>>> On Wed, Jul 17, 2019 at 2:48 PM Levani Kokhreidze
>>>>>  wrote:
>>>>>> 
>>>>>> Yes, I was thinking about it as well. To be honest I’m not sure about
>> it yet.
>>>>>> As Kafka Streams DSL user, I don’t really think I would need control
>> over partitioner for internal topics.
>>>>>> As a user, I would assume that Kafka Streams knows best how to
>> partition data for internal topics.
>>>>>> In this KIP I wrote that Produced should be used only for topics that
>> are created by user In advance.
>>>>>> In those cases maybe it make sense to have possibility to specify the
>> partitioner.
>>>>>> I don’t have clear answer on that yet, but I guess specifying the
>> partitioner can be added as well if there’s agreement on this.
>>>>>> 
>>>>>> Regards,
>>>>>> Levani
>>>>>> 
>>>>>>> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman <
>> sop...@confluent.io> wrote:
>>>>>>> 
>

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-19 Thread Levani Kokhreidze
Hi John,

In my mind it makes sense. 
If we add partitioner configuration to Repartitioned class, with the 
combination of specifying number of partitions for internal topics, user will 
have opportunity to ensure co-partitioning before join operation. 
I think this can be quite powerful feature.
Wondering what others think about this?

Regards,
Levani

> On Jul 18, 2019, at 1:20 AM, John Roesler  wrote:
> 
> Yes, I believe that's what I had in mind. Again, not totally sure it
> makes sense, but I believe something similar is the rationale for
> having the partitioner option in Produced.
> 
> Thanks,
> -John
> 
> On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze
>  wrote:
>> 
>> Hey John,
>> 
>> Oh that’s interesting use-case.
>> Do I understand this correctly, in your example I would first issue 
>> repartition(Repartitioned) with proper partitioner that essentially would be 
>> the same as the topic I want to join with and then do the KStream#join with 
>> DSL?
>> 
>> Regards,
>> Levani
>> 
>>> On Jul 17, 2019, at 11:11 PM, John Roesler  wrote:
>>> 
>>> Hey, all, just to chime in,
>>> 
>>> I think it might be useful to have an option to specify the
>>> partitioner. The case I have in mind is that some data may get
>>> repartitioned and then joined with an input topic. If the right-side
>>> input topic uses a custom partitioning strategy, then the
>>> repartitioned stream also needs to be partitioned with the same
>>> strategy.
>>> 
>>> Does that make sense, or did I maybe miss something important?
>>> 
>>> Thanks,
>>> -John
>>> 
>>> On Wed, Jul 17, 2019 at 2:48 PM Levani Kokhreidze
>>>  wrote:
>>>> 
>>>> Yes, I was thinking about it as well. To be honest I’m not sure about it 
>>>> yet.
>>>> As Kafka Streams DSL user, I don’t really think I would need control over 
>>>> partitioner for internal topics.
>>>> As a user, I would assume that Kafka Streams knows best how to partition 
>>>> data for internal topics.
>>>> In this KIP I wrote that Produced should be used only for topics that are 
>>>> created by user In advance.
>>>> In those cases maybe it make sense to have possibility to specify the 
>>>> partitioner.
>>>> I don’t have clear answer on that yet, but I guess specifying the 
>>>> partitioner can be added as well if there’s agreement on this.
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>>> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman  
>>>>> wrote:
>>>>> 
>>>>> Thanks for clearing that up. I agree that Repartitioned would be a useful
>>>>> addition. I'm wondering if it might also need to have
>>>>> a withStreamPartitioner method/field, similar to Produced? I'm not sure 
>>>>> how
>>>>> widely this feature is really used, but seems it should be available for
>>>>> repartition topics.
>>>>> 
>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani Kokhreidze 
>>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Hey Sophie,
>>>>>> 
>>>>>> In both cases KStream#repartition and KStream#repartition(Repartitioned)
>>>>>> topic will be created and managed by Kafka Streams.
>>>>>> Idea of Repartitioned is to give user more control over the topic such as
>>>>>> num of partitions.
>>>>>> I feel like Repartitioned parameter is something that is missing in
>>>>>> current DSL design.
>>>>>> Essentially giving user control over parallelism by configuring num of
>>>>>> partitions for internal topics.
>>>>>> 
>>>>>> Hope this answers your question.
>>>>>> 
>>>>>> Regards,
>>>>>> Levani
>>>>>> 
>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman 
>>>>>> wrote:
>>>>>>> 
>>>>>>> Hey Levani,
>>>>>>> 
>>>>>>> Thanks for the KIP! Can you clarify one thing for me -- for the
>>>>>>> KStream#repartition signature taking a Repartitioned, will the topic be
>>>>>>> auto-created by Streams (which seems to be the case for the signature
>>>>>>> without a Repartitioned) or does it have to be pre-created? 

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-17 Thread Levani Kokhreidze
Hey John,

Oh that’s interesting use-case. 
Do I understand this correctly, in your example I would first issue 
repartition(Repartitioned) with proper partitioner that essentially would be 
the same as the topic I want to join with and then do the KStream#join with DSL?

Regards,
Levani

> On Jul 17, 2019, at 11:11 PM, John Roesler  wrote:
> 
> Hey, all, just to chime in,
> 
> I think it might be useful to have an option to specify the
> partitioner. The case I have in mind is that some data may get
> repartitioned and then joined with an input topic. If the right-side
> input topic uses a custom partitioning strategy, then the
> repartitioned stream also needs to be partitioned with the same
> strategy.
> 
> Does that make sense, or did I maybe miss something important?
> 
> Thanks,
> -John
> 
> On Wed, Jul 17, 2019 at 2:48 PM Levani Kokhreidze
>  wrote:
>> 
>> Yes, I was thinking about it as well. To be honest I’m not sure about it yet.
>> As Kafka Streams DSL user, I don’t really think I would need control over 
>> partitioner for internal topics.
>> As a user, I would assume that Kafka Streams knows best how to partition 
>> data for internal topics.
>> In this KIP I wrote that Produced should be used only for topics that are 
>> created by user In advance.
>> In those cases maybe it make sense to have possibility to specify the 
>> partitioner.
>> I don’t have clear answer on that yet, but I guess specifying the 
>> partitioner can be added as well if there’s agreement on this.
>> 
>> Regards,
>> Levani
>> 
>>> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman  
>>> wrote:
>>> 
>>> Thanks for clearing that up. I agree that Repartitioned would be a useful
>>> addition. I'm wondering if it might also need to have
>>> a withStreamPartitioner method/field, similar to Produced? I'm not sure how
>>> widely this feature is really used, but seems it should be available for
>>> repartition topics.
>>> 
>>> On Wed, Jul 17, 2019 at 11:26 AM Levani Kokhreidze 
>>> wrote:
>>> 
>>>> Hey Sophie,
>>>> 
>>>> In both cases KStream#repartition and KStream#repartition(Repartitioned)
>>>> topic will be created and managed by Kafka Streams.
>>>> Idea of Repartitioned is to give user more control over the topic such as
>>>> num of partitions.
>>>> I feel like Repartitioned parameter is something that is missing in
>>>> current DSL design.
>>>> Essentially giving user control over parallelism by configuring num of
>>>> partitions for internal topics.
>>>> 
>>>> Hope this answers your question.
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>>> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman 
>>>> wrote:
>>>>> 
>>>>> Hey Levani,
>>>>> 
>>>>> Thanks for the KIP! Can you clarify one thing for me -- for the
>>>>> KStream#repartition signature taking a Repartitioned, will the topic be
>>>>> auto-created by Streams (which seems to be the case for the signature
>>>>> without a Repartitioned) or does it have to be pre-created? The wording
>>>> in
>>>>> the KIP makes it seem like one version of the method will auto-create
>>>>> topics while the other will not.
>>>>> 
>>>>> Cheers,
>>>>> Sophie
>>>>> 
>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani Kokhreidze <
>>>> levani.co...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hello,
>>>>>> 
>>>>>> One more bump about KIP-221 (
>>>>>> 
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>>> <
>>>>>> 
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>> )
>>>>>> so it doesn’t get lost in mailing list :)
>>>>>> Would love to hear communities opinions/concerns about this KIP.
>>>>>> 
>>>>>> Regards,
>>>>>> Levani
>>>>>> 
>>>>>> 
>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze >>>> 
>>>>>> wrote:
>>>>>>> 
>>>>>>> Hello,
>>>>&

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-17 Thread Levani Kokhreidze
Yes, I was thinking about it as well. To be honest I’m not sure about it yet.
As Kafka Streams DSL user, I don’t really think I would need control over 
partitioner for internal topics.
As a user, I would assume that Kafka Streams knows best how to partition data 
for internal topics.
In this KIP I wrote that Produced should be used only for topics that are 
created by user In advance. 
In those cases maybe it make sense to have possibility to specify the 
partitioner.
I don’t have clear answer on that yet, but I guess specifying the partitioner 
can be added as well if there’s agreement on this.

Regards,
Levani

> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman  wrote:
> 
> Thanks for clearing that up. I agree that Repartitioned would be a useful
> addition. I'm wondering if it might also need to have
> a withStreamPartitioner method/field, similar to Produced? I'm not sure how
> widely this feature is really used, but seems it should be available for
> repartition topics.
> 
> On Wed, Jul 17, 2019 at 11:26 AM Levani Kokhreidze 
> wrote:
> 
>> Hey Sophie,
>> 
>> In both cases KStream#repartition and KStream#repartition(Repartitioned)
>> topic will be created and managed by Kafka Streams.
>> Idea of Repartitioned is to give user more control over the topic such as
>> num of partitions.
>> I feel like Repartitioned parameter is something that is missing in
>> current DSL design.
>> Essentially giving user control over parallelism by configuring num of
>> partitions for internal topics.
>> 
>> Hope this answers your question.
>> 
>> Regards,
>> Levani
>> 
>>> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman 
>> wrote:
>>> 
>>> Hey Levani,
>>> 
>>> Thanks for the KIP! Can you clarify one thing for me -- for the
>>> KStream#repartition signature taking a Repartitioned, will the topic be
>>> auto-created by Streams (which seems to be the case for the signature
>>> without a Repartitioned) or does it have to be pre-created? The wording
>> in
>>> the KIP makes it seem like one version of the method will auto-create
>>> topics while the other will not.
>>> 
>>> Cheers,
>>> Sophie
>>> 
>>> On Wed, Jul 17, 2019 at 10:15 AM Levani Kokhreidze <
>> levani.co...@gmail.com>
>>> wrote:
>>> 
>>>> Hello,
>>>> 
>>>> One more bump about KIP-221 (
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>> <
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>> )
>>>> so it doesn’t get lost in mailing list :)
>>>> Would love to hear communities opinions/concerns about this KIP.
>>>> 
>>>> Regards,
>>>> Levani
>>>> 
>>>> 
>>>>> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze >> 
>>>> wrote:
>>>>> 
>>>>> Hello,
>>>>> 
>>>>> Kind reminder about this KIP:
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>> <
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>> 
>>>>> 
>>>>> Regards,
>>>>> Levani
>>>>> 
>>>>>> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze <
>> levani.co...@gmail.com
>>>> <mailto:levani.co...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hello,
>>>>>> 
>>>>>> In order to move this KIP forward, I’ve updated confluence page with
>>>> the new proposal
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>> <
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>>> 
>>>>>> I’ve also filled “Rejected Alternatives” section.
>>>>>> 
>>>>>> Looking forward to discuss this KIP :)
>>>>>> 
>>>>>> King regards,
>>>>>> Levani
>>>>>> 
>>>>>> 
>>>>>>> On Jul 3, 2019, at 1:0

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-17 Thread Levani Kokhreidze
Hey Sophie,

In both cases KStream#repartition and KStream#repartition(Repartitioned) topic 
will be created and managed by Kafka Streams. 
Idea of Repartitioned is to give user more control over the topic such as num 
of partitions. 
I feel like Repartitioned parameter is something that is missing in current DSL 
design. 
Essentially giving user control over parallelism by configuring num of 
partitions for internal topics.

Hope this answers your question.

Regards,
Levani

> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman  wrote:
> 
> Hey Levani,
> 
> Thanks for the KIP! Can you clarify one thing for me -- for the
> KStream#repartition signature taking a Repartitioned, will the topic be
> auto-created by Streams (which seems to be the case for the signature
> without a Repartitioned) or does it have to be pre-created? The wording in
> the KIP makes it seem like one version of the method will auto-create
> topics while the other will not.
> 
> Cheers,
> Sophie
> 
> On Wed, Jul 17, 2019 at 10:15 AM Levani Kokhreidze 
> wrote:
> 
>> Hello,
>> 
>> One more bump about KIP-221 (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>)
>> so it doesn’t get lost in mailing list :)
>> Would love to hear communities opinions/concerns about this KIP.
>> 
>> Regards,
>> Levani
>> 
>> 
>>> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze 
>> wrote:
>>> 
>>> Hello,
>>> 
>>> Kind reminder about this KIP:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>> 
>>> 
>>> Regards,
>>> Levani
>>> 
>>>> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze > <mailto:levani.co...@gmail.com>> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> In order to move this KIP forward, I’ve updated confluence page with
>> the new proposal
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>> 
>>>> I’ve also filled “Rejected Alternatives” section.
>>>> 
>>>> Looking forward to discuss this KIP :)
>>>> 
>>>> King regards,
>>>> Levani
>>>> 
>>>> 
>>>>> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze > <mailto:levani.co...@gmail.com>> wrote:
>>>>> 
>>>>> Hello Matthias,
>>>>> 
>>>>> Thanks for the feedback and ideas.
>>>>> I like the idea of introducing dedicated `Topic` class for topic
>> configuration for internal operators like `groupedBy`.
>>>>> Would be great to hear others opinion about this as well.
>>>>> 
>>>>> Kind regards,
>>>>> Levani
>>>>> 
>>>>> 
>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax > <mailto:matth...@confluent.io>> wrote:
>>>>>> 
>>>>>> Levani,
>>>>>> 
>>>>>> Thanks for picking up this KIP! And thanks for summarizing everything.
>>>>>> Even if some points may have been discussed already (can't really
>>>>>> remember), it's helpful to get a good summary to refresh the
>> discussion.
>>>>>> 
>>>>>> I think your reasoning makes sense. With regard to the distinction
>>>>>> between operators that manage topics and operators that use
>> user-created
>>>>>> topics: Following this argument, it might indicate that leaving
>>>>>> `through()` as-is (as an operator that uses use-defined topics) and
>>>>>> introducing a new `repartition()` operator (an operator that manages
>>>>>> topics itself) might be good. Otherwise, there is one operator
>>>>>> `through()` that sometimes manages topics but sometimes not; a
>> different
>>>>>> name, ie, new operator would make the distinction clearer.
>>>>>> 
>>>>>> About adding `n

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-17 Thread Levani Kokhreidze
Hello,

One more bump about KIP-221 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>)
 so it doesn’t get lost in mailing list :)
Would love to hear communities opinions/concerns about this KIP.

Regards,
Levani


> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze  wrote:
> 
> Hello,
> 
> Kind reminder about this KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>  
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>
> 
> Regards,
> Levani
> 
>> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze > <mailto:levani.co...@gmail.com>> wrote:
>> 
>> Hello,
>> 
>> In order to move this KIP forward, I’ve updated confluence page with the new 
>> proposal 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>  
>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>
>> I’ve also filled “Rejected Alternatives” section. 
>> 
>> Looking forward to discuss this KIP :)
>> 
>> King regards,
>> Levani
>> 
>> 
>>> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze >> <mailto:levani.co...@gmail.com>> wrote:
>>> 
>>> Hello Matthias,
>>> 
>>> Thanks for the feedback and ideas. 
>>> I like the idea of introducing dedicated `Topic` class for topic 
>>> configuration for internal operators like `groupedBy`.
>>> Would be great to hear others opinion about this as well.
>>> 
>>> Kind regards,
>>> Levani 
>>> 
>>> 
>>>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax >>> <mailto:matth...@confluent.io>> wrote:
>>>> 
>>>> Levani,
>>>> 
>>>> Thanks for picking up this KIP! And thanks for summarizing everything.
>>>> Even if some points may have been discussed already (can't really
>>>> remember), it's helpful to get a good summary to refresh the discussion.
>>>> 
>>>> I think your reasoning makes sense. With regard to the distinction
>>>> between operators that manage topics and operators that use user-created
>>>> topics: Following this argument, it might indicate that leaving
>>>> `through()` as-is (as an operator that uses use-defined topics) and
>>>> introducing a new `repartition()` operator (an operator that manages
>>>> topics itself) might be good. Otherwise, there is one operator
>>>> `through()` that sometimes manages topics but sometimes not; a different
>>>> name, ie, new operator would make the distinction clearer.
>>>> 
>>>> About adding `numOfPartitions` to `Grouped`. I am wondering if the same
>>>> argument as for `Produced` does apply and adding it is semantically
>>>> questionable? Might be good to get opinions of others on this, too. I am
>>>> not sure myself what solution I prefer atm.
>>>> 
>>>> So far, KS uses configuration objects that allow to configure a certain
>>>> "entity" like a consumer, producer, store. If we assume that a topic is
>>>> a similar entity, I am wonder if we should have a
>>>> `Topic#withNumberOfPartitions()` class and method instead of a plain
>>>> integer? This would allow us to add other configs, like replication
>>>> factor, retention-time etc, easily, without the need to change the "main
>>>> API".
>>>> 
>>>> Just want to give some ideas. Not sure if I like them myself. :)
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> 
>>>> 
>>>> On 7/1/19 1:04 AM, Levani Kokhreidze wrote:
>>>>> Actually, giving it more though - maybe enhancing Produced with num of 
>>>>> partitions configuration is not the best approach. Let me explain why:
>>>>> 
>>>>> 1) If we enhance Produced class with this configuration, this will also 
>>>>> affect KStream#to operation. Since KStream#to is the final sink of the 
>>>>> topology, for me, it seems to be reasonable assumption that user needs to 
>>>>> manually create sink topic in 

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-12 Thread Levani Kokhreidze
Hello,

Kind reminder about this KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>

Regards,
Levani

> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze  wrote:
> 
> Hello,
> 
> In order to move this KIP forward, I’ve updated confluence page with the new 
> proposal 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>  
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>
> I’ve also filled “Rejected Alternatives” section. 
> 
> Looking forward to discuss this KIP :)
> 
> King regards,
> Levani
> 
> 
>> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze > <mailto:levani.co...@gmail.com>> wrote:
>> 
>> Hello Matthias,
>> 
>> Thanks for the feedback and ideas. 
>> I like the idea of introducing dedicated `Topic` class for topic 
>> configuration for internal operators like `groupedBy`.
>> Would be great to hear others opinion about this as well.
>> 
>> Kind regards,
>> Levani 
>> 
>> 
>>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax >> <mailto:matth...@confluent.io>> wrote:
>>> 
>>> Levani,
>>> 
>>> Thanks for picking up this KIP! And thanks for summarizing everything.
>>> Even if some points may have been discussed already (can't really
>>> remember), it's helpful to get a good summary to refresh the discussion.
>>> 
>>> I think your reasoning makes sense. With regard to the distinction
>>> between operators that manage topics and operators that use user-created
>>> topics: Following this argument, it might indicate that leaving
>>> `through()` as-is (as an operator that uses use-defined topics) and
>>> introducing a new `repartition()` operator (an operator that manages
>>> topics itself) might be good. Otherwise, there is one operator
>>> `through()` that sometimes manages topics but sometimes not; a different
>>> name, ie, new operator would make the distinction clearer.
>>> 
>>> About adding `numOfPartitions` to `Grouped`. I am wondering if the same
>>> argument as for `Produced` does apply and adding it is semantically
>>> questionable? Might be good to get opinions of others on this, too. I am
>>> not sure myself what solution I prefer atm.
>>> 
>>> So far, KS uses configuration objects that allow to configure a certain
>>> "entity" like a consumer, producer, store. If we assume that a topic is
>>> a similar entity, I am wonder if we should have a
>>> `Topic#withNumberOfPartitions()` class and method instead of a plain
>>> integer? This would allow us to add other configs, like replication
>>> factor, retention-time etc, easily, without the need to change the "main
>>> API".
>>> 
>>> Just want to give some ideas. Not sure if I like them myself. :)
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> 
>>> On 7/1/19 1:04 AM, Levani Kokhreidze wrote:
>>>> Actually, giving it more though - maybe enhancing Produced with num of 
>>>> partitions configuration is not the best approach. Let me explain why:
>>>> 
>>>> 1) If we enhance Produced class with this configuration, this will also 
>>>> affect KStream#to operation. Since KStream#to is the final sink of the 
>>>> topology, for me, it seems to be reasonable assumption that user needs to 
>>>> manually create sink topic in advance. And in that case, having num of 
>>>> partitions configuration doesn’t make much sense. 
>>>> 
>>>> 2) Looking at Produced class, based on API contract, seems like Produced 
>>>> is designed to be something that is explicitly for producer (key 
>>>> serializer, value serializer, partitioner those all are producer specific 
>>>> configurations) and num of partitions is topic level configuration. And I 
>>>> don’t think mixing topic and producer level configurations together in one 
>>>> class is the good approach.
>>>> 
>>>> 3) Looking at KStream interface, seems like Produced parameter is for 
>>>> operations that work with non-internal (e.g topics created and managed 
>>>> internally by Kafka Streams) topics and I think we should leav

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-09 Thread Levani Kokhreidze
Hello,

In order to move this KIP forward, I’ve updated confluence page with the new 
proposal 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint>
I’ve also filled “Rejected Alternatives” section. 

Looking forward to discuss this KIP :)

King regards,
Levani


> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze  wrote:
> 
> Hello Matthias,
> 
> Thanks for the feedback and ideas. 
> I like the idea of introducing dedicated `Topic` class for topic 
> configuration for internal operators like `groupedBy`.
> Would be great to hear others opinion about this as well.
> 
> Kind regards,
> Levani 
> 
> 
>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax  wrote:
>> 
>> Levani,
>> 
>> Thanks for picking up this KIP! And thanks for summarizing everything.
>> Even if some points may have been discussed already (can't really
>> remember), it's helpful to get a good summary to refresh the discussion.
>> 
>> I think your reasoning makes sense. With regard to the distinction
>> between operators that manage topics and operators that use user-created
>> topics: Following this argument, it might indicate that leaving
>> `through()` as-is (as an operator that uses use-defined topics) and
>> introducing a new `repartition()` operator (an operator that manages
>> topics itself) might be good. Otherwise, there is one operator
>> `through()` that sometimes manages topics but sometimes not; a different
>> name, ie, new operator would make the distinction clearer.
>> 
>> About adding `numOfPartitions` to `Grouped`. I am wondering if the same
>> argument as for `Produced` does apply and adding it is semantically
>> questionable? Might be good to get opinions of others on this, too. I am
>> not sure myself what solution I prefer atm.
>> 
>> So far, KS uses configuration objects that allow to configure a certain
>> "entity" like a consumer, producer, store. If we assume that a topic is
>> a similar entity, I am wonder if we should have a
>> `Topic#withNumberOfPartitions()` class and method instead of a plain
>> integer? This would allow us to add other configs, like replication
>> factor, retention-time etc, easily, without the need to change the "main
>> API".
>> 
>> Just want to give some ideas. Not sure if I like them myself. :)
>> 
>> 
>> -Matthias
>> 
>> 
>> 
>> On 7/1/19 1:04 AM, Levani Kokhreidze wrote:
>>> Actually, giving it more though - maybe enhancing Produced with num of 
>>> partitions configuration is not the best approach. Let me explain why:
>>> 
>>> 1) If we enhance Produced class with this configuration, this will also 
>>> affect KStream#to operation. Since KStream#to is the final sink of the 
>>> topology, for me, it seems to be reasonable assumption that user needs to 
>>> manually create sink topic in advance. And in that case, having num of 
>>> partitions configuration doesn’t make much sense. 
>>> 
>>> 2) Looking at Produced class, based on API contract, seems like Produced is 
>>> designed to be something that is explicitly for producer (key serializer, 
>>> value serializer, partitioner those all are producer specific 
>>> configurations) and num of partitions is topic level configuration. And I 
>>> don’t think mixing topic and producer level configurations together in one 
>>> class is the good approach.
>>> 
>>> 3) Looking at KStream interface, seems like Produced parameter is for 
>>> operations that work with non-internal (e.g topics created and managed 
>>> internally by Kafka Streams) topics and I think we should leave it as it is 
>>> in that case.
>>> 
>>> Taking all this things into account, I think we should distinguish between 
>>> DSL operations, where Kafka Streams should create and manage internal 
>>> topics (KStream#groupBy) vs topics that should be created in advance (e.g 
>>> KStream#to).
>>> 
>>> To sum it up, I think adding numPartitions configuration in Produced will 
>>> result in mixing topic and producer level configuration in one class and 
>>> it’s gonna break existing API semantics.
>>> 
>>> Regarding making topic name optional in KStream#through - I think underline 
>>> idea is very useful and giving users possibility to specify num of 
>>> partitions there is even more useful :) Considering argumen

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-03 Thread Levani Kokhreidze
Hello Matthias,

Thanks for the feedback and ideas. 
I like the idea of introducing dedicated `Topic` class for topic configuration 
for internal operators like `groupedBy`.
Would be great to hear others opinion about this as well.

Kind regards,
Levani 


> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax  wrote:
> 
> Levani,
> 
> Thanks for picking up this KIP! And thanks for summarizing everything.
> Even if some points may have been discussed already (can't really
> remember), it's helpful to get a good summary to refresh the discussion.
> 
> I think your reasoning makes sense. With regard to the distinction
> between operators that manage topics and operators that use user-created
> topics: Following this argument, it might indicate that leaving
> `through()` as-is (as an operator that uses use-defined topics) and
> introducing a new `repartition()` operator (an operator that manages
> topics itself) might be good. Otherwise, there is one operator
> `through()` that sometimes manages topics but sometimes not; a different
> name, ie, new operator would make the distinction clearer.
> 
> About adding `numOfPartitions` to `Grouped`. I am wondering if the same
> argument as for `Produced` does apply and adding it is semantically
> questionable? Might be good to get opinions of others on this, too. I am
> not sure myself what solution I prefer atm.
> 
> So far, KS uses configuration objects that allow to configure a certain
> "entity" like a consumer, producer, store. If we assume that a topic is
> a similar entity, I am wonder if we should have a
> `Topic#withNumberOfPartitions()` class and method instead of a plain
> integer? This would allow us to add other configs, like replication
> factor, retention-time etc, easily, without the need to change the "main
> API".
> 
> Just want to give some ideas. Not sure if I like them myself. :)
> 
> 
> -Matthias
> 
> 
> 
> On 7/1/19 1:04 AM, Levani Kokhreidze wrote:
>> Actually, giving it more though - maybe enhancing Produced with num of 
>> partitions configuration is not the best approach. Let me explain why:
>> 
>> 1) If we enhance Produced class with this configuration, this will also 
>> affect KStream#to operation. Since KStream#to is the final sink of the 
>> topology, for me, it seems to be reasonable assumption that user needs to 
>> manually create sink topic in advance. And in that case, having num of 
>> partitions configuration doesn’t make much sense. 
>> 
>> 2) Looking at Produced class, based on API contract, seems like Produced is 
>> designed to be something that is explicitly for producer (key serializer, 
>> value serializer, partitioner those all are producer specific 
>> configurations) and num of partitions is topic level configuration. And I 
>> don’t think mixing topic and producer level configurations together in one 
>> class is the good approach.
>> 
>> 3) Looking at KStream interface, seems like Produced parameter is for 
>> operations that work with non-internal (e.g topics created and managed 
>> internally by Kafka Streams) topics and I think we should leave it as it is 
>> in that case.
>> 
>> Taking all this things into account, I think we should distinguish between 
>> DSL operations, where Kafka Streams should create and manage internal topics 
>> (KStream#groupBy) vs topics that should be created in advance (e.g 
>> KStream#to).
>> 
>> To sum it up, I think adding numPartitions configuration in Produced will 
>> result in mixing topic and producer level configuration in one class and 
>> it’s gonna break existing API semantics.
>> 
>> Regarding making topic name optional in KStream#through - I think underline 
>> idea is very useful and giving users possibility to specify num of 
>> partitions there is even more useful :) Considering arguments against adding 
>> num of partitions in Produced class, I see two options here:
>> 1) Add following method overloads
>>  * through() - topic will be auto-generated and num of partitions will 
>> be taken from source topic
>>  * through(final int numOfPartitions) - topic will be auto generated 
>> with specified num of partitions
>>  * through(final int numOfPartitions, final Produced produced) - 
>> topic will be with generated with specified num of partitions and 
>> configuration taken from produced parameter.
>> 2) Leave KStream#through as it is and introduce new method - 
>> KStream#repartition (I think Matthias suggested this in one of the threads)
>> 
>> Considering all mentioned above I propose the following plan:
>> 
>> Option A:
>> 1) Leave Produce

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-01 Thread Levani Kokhreidze
Actually, giving it more though - maybe enhancing Produced with num of 
partitions configuration is not the best approach. Let me explain why:

1) If we enhance Produced class with this configuration, this will also affect 
KStream#to operation. Since KStream#to is the final sink of the topology, for 
me, it seems to be reasonable assumption that user needs to manually create 
sink topic in advance. And in that case, having num of partitions configuration 
doesn’t make much sense. 

2) Looking at Produced class, based on API contract, seems like Produced is 
designed to be something that is explicitly for producer (key serializer, value 
serializer, partitioner those all are producer specific configurations) and num 
of partitions is topic level configuration. And I don’t think mixing topic and 
producer level configurations together in one class is the good approach.

3) Looking at KStream interface, seems like Produced parameter is for 
operations that work with non-internal (e.g topics created and managed 
internally by Kafka Streams) topics and I think we should leave it as it is in 
that case.

Taking all this things into account, I think we should distinguish between DSL 
operations, where Kafka Streams should create and manage internal topics 
(KStream#groupBy) vs topics that should be created in advance (e.g KStream#to).

To sum it up, I think adding numPartitions configuration in Produced will 
result in mixing topic and producer level configuration in one class and it’s 
gonna break existing API semantics.

Regarding making topic name optional in KStream#through - I think underline 
idea is very useful and giving users possibility to specify num of partitions 
there is even more useful :) Considering arguments against adding num of 
partitions in Produced class, I see two options here:
1) Add following method overloads
* through() - topic will be auto-generated and num of partitions will 
be taken from source topic
* through(final int numOfPartitions) - topic will be auto generated 
with specified num of partitions
* through(final int numOfPartitions, final Produced produced) - 
topic will be with generated with specified num of partitions and configuration 
taken from produced parameter.
2) Leave KStream#through as it is and introduce new method - 
KStream#repartition (I think Matthias suggested this in one of the threads)

Considering all mentioned above I propose the following plan:

Option A:
1) Leave Produced as it is
2) Add num of partitions configuration to Grouped class (as mentioned in the 
KIP)
3) Add following method overloads to KStream#through
* through() - topic will be auto-generated and num of partitions will 
be taken from source topic
* through(final int numOfPartitions) - topic will be auto generated 
with specified num of partitions
* through(final int numOfPartitions, final Produced produced) - 
topic will be with generated with specified num of partitions and configuration 
taken from produced parameter.

Option B:
1) Leave Produced as it is
2) Add num of partitions configuration to Grouped class (as mentioned in the 
KIP)
3) Add new operator KStream#repartition for creating and managing internal 
repartition topics

P.S. I’m sorry if all of this was already discussed in the mailing list, but I 
kinda got with all the threads that were about this KIP :(

Kind regards,
Levani

> On Jul 1, 2019, at 9:56 AM, Levani Kokhreidze  wrote:
> 
> Hello,
> 
> I would like to resurrect discussion around KIP-221. Going through the 
> discussion thread, there’s seems to agreement around usefulness of this 
> feature. 
> Regarding the implementation, as far as I understood, the most optimal 
> solution for me seems the following:
> 
> 1) Add two method overloads to KStream#through method (essentially making 
> topic name optional)
> 2) Enhance Produced class with numOfPartitions configuration field.
> 
> Those two changes will allow DSL users to control parallelism and trigger 
> re-partition without doing stateful operations.
> 
> I will update KIP with interface changes around KStream#through if this 
> changes sound sensible.
> 
> Kind regards,
> Levani



Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-01 Thread Levani Kokhreidze
Hello,

I would like to resurrect discussion around KIP-221. Going through the 
discussion thread, there’s seems to agreement around usefulness of this 
feature. 
Regarding the implementation, as far as I understood, the most optimal solution 
for me seems the following:

1) Add two method overloads to KStream#through method (essentially making topic 
name optional)
2) Enhance Produced class with numOfPartitions configuration field.

Those two changes will allow DSL users to control parallelism and trigger 
re-partition without doing stateful operations.

I will update KIP with interface changes around KStream#through if this changes 
sound sensible.

Kind regards,
Levani

[DISCUSS] KIP-485 Make topic optional when using through() operations in DSL

2019-06-28 Thread Levani Kokhreidze
Hello,

I would like to start discussion about KIP-485 Make topic optional when using 
through() operations in DSL 


Regards,
Levani

[jira] [Created] (KAFKA-8611) Make topic optional when using through() operations in DSL

2019-06-28 Thread Levani Kokhreidze (JIRA)
Levani Kokhreidze created KAFKA-8611:


 Summary: Make topic optional when using through() operations in DSL
 Key: KAFKA-8611
 URL: https://issues.apache.org/jira/browse/KAFKA-8611
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Levani Kokhreidze


When using DSL in Kafka Streams, data re-partition happens only when 
key-changing operation is followed by stateful operation. On the other hand, in 
DSL, stateful computation can happen using _transform()_ operation as well. 
Problem with this approach is that, even if any upstream operation was 
key-changing before calling _transform()_, no auto-repartition is triggered. If 
repartitioning is required, a call to _through(String)_ should be performed 
before _transform()_. With the current implementation, burden of managing and 
creating the topic falls on user and introduces extra complexity of managing 
Kafka Streams application.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Permissions for creating KIP

2019-06-23 Thread Levani Kokhreidze
Hi,

Please give me permission for creating KIP 
.
 You can find link to my profile on confluence here 
.

Kind regards,
Levani

[jira] [Created] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Levani Kokhreidze (JIRA)
Levani Kokhreidze created KAFKA-8413:


 Summary: Add possibility to do repartitioning on KStream
 Key: KAFKA-8413
 URL: https://issues.apache.org/jira/browse/KAFKA-8413
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Levani Kokhreidze
 Attachments: topology-1.png, topology-2.png

Consider following code:

 
{code:java}
final KStream streamByProfileId = streamsBuilder
   .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
   .selectKey((key, value) -> value);

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-1")
   );

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-2")
   );
{code}
 

This code will generate following topology:

 
{code:java}
Topologies:
 Sub-topology: 0
 Source: KSTREAM-SOURCE-00 (topics: [input-topic])
 --> KSTREAM-KEY-SELECT-01
 Processor: KSTREAM-KEY-SELECT-01 (stores: [])
 --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
 <-- KSTREAM-SOURCE-00
 Processor: KSTREAM-FILTER-04 (stores: [])
 --> KSTREAM-SINK-03
 <-- KSTREAM-KEY-SELECT-01
 Processor: KSTREAM-FILTER-08 (stores: [])
 --> KSTREAM-SINK-07
 <-- KSTREAM-KEY-SELECT-01
 Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
 <-- KSTREAM-FILTER-04
 Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
 <-- KSTREAM-FILTER-08
Sub-topology: 1
 Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
 --> KSTREAM-AGGREGATE-02
 Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
 --> none
 <-- KSTREAM-SOURCE-05
Sub-topology: 2
 Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
 --> KSTREAM-AGGREGATE-06
 Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
 --> none
 <-- KSTREAM-SOURCE-09
 
{code}

Kafka Streams creates two repartition topics for each `groupByKey` operation. 
In this example, two repartition topics are not really necessary and processing 
can be done with one sub-topology.


Kafka Streams user, in DSL, may specify repartition topic manually using 
*KStream#through* method:

 
{code:java}
final KStream streamByProfileId = streamsBuilder
   .stream("input-topic")
   .selectKey((key, value) -> value)
   .through("repartition-topic");

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-1")
   );

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-2")
   );
{code}
 

 
{code:java}
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-00 (topics: [input-topic])
--> KSTREAM-KEY-SELECT-01
Processor: KSTREAM-KEY-SELECT-01 (stores: [])
--> KSTREAM-SINK-02
<-- KSTREAM-SOURCE-00
Sink: KSTREAM-SINK-02 (topic: repartition-topic)
<-- KSTREAM-KEY-SELECT-01

Sub-topology: 1
Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
--> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
--> none
<-- KSTREAM-SOURCE-03
Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
--> none
<-- KSTREAM-SOURCE-03
{code}
 

 

While this gives possibility to optimizes Kafka Streams application, user still 
has to manually create repartition topic with correct number of partitions 
based on input topic. It would be great if in DSL we could have something like 
*repartition()* operation on *KStream* which can generate repartition topic 
based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)