Re: [DISCUSS] KIP-617: Allow Kafka Streams State Stores to be iterated backwards

2020-07-22 Thread Matthias J. Sax
Finally cycling back to this.

Overall I like the KIP.

Two comments:

 - I tried to figure out why the two InMemoerySessionStore methods are
deprecated and it seems those annotations are there since the class was
added; as this seems to be a bug, and there are no backward
compatibility concerns, I just did a PR to remove those annotations:

https://github.com/apache/kafka/pull/9061


 - about moving the "read" methods from SessionStore to
ReadOnlySessionsStore: while I agree that this make sense, it is
strictly specking backward incompatible if we don't add default
implementations. On the other hand, it seems that the likelihood that
one only implement ReadOnlySessionStore is basically zero, so I am not
sure if its worth to bother?


-Matthias

On 7/4/20 7:02 AM, John Roesler wrote:
> Thanks Jorge,
> 
> This KIP looks good to me!
> 
> -John
> 
> On Fri, Jul 3, 2020, at 03:19, Jorge Esteban Quilcate Otoya wrote:
>> Hi John,
>>
>> Thanks for the feedback.
>>
>> I'd be happy to take the third option and consider moving methods to
>> ReadOnlySessionStore as part of the KIP.
>> Docs is updated to reflect these changes.
>>
>> Cheers,
>> Jorge.
>>
>> On Thu, Jul 2, 2020 at 10:06 PM John Roesler  wrote:
>>
>>> Hey Jorge,
>>>
>>> Thanks for the details. That sounds like a mistake to me on both counts.
>>>
>>> I don’t think you need to worry about those depreciations. If the
>>> interface methods aren’t deprecated, then the methods are not deprecated.
>>> We should remove the annotations, but it doesn’t need to be in the kip.
>>>
>>> I think any query methods should have been in the ReadOnly interface. I
>>> guess it’s up to you whether you want to:
>>> 1. Add the reverse methods next to the existing methods (what you have in
>>> the kip right now)
>>> 2. Partially fix it by adding your new methods to the ReadOnly interface
>>> 3. Fully fix the problem by moving the existing methods as well as your
>>> new ones. Since  SessionStore extends ReadOnlySessionStore, it’s ok just to
>>> move the definitions.
>>>
>>> I’m ok with whatever you prefer.
>>>
>>> Thanks,
>>> John
>>>
>>> On Thu, Jul 2, 2020, at 11:29, Jorge Esteban Quilcate Otoya wrote:
 (sorry for the spam)

 Actually `findSessions` are only deprecated on `InMemorySessionStore`,
 which seems strange as RocksDB and interfaces haven't marked these
>>> methods
 as deprecated.

 Any hint on how to handle this?

 Cheers,

 On Thu, Jul 2, 2020 at 4:57 PM Jorge Esteban Quilcate Otoya <
 quilcate.jo...@gmail.com> wrote:

> @John: I can see there are some deprecations in there as well. Will
>>> update
> the KIP.
>
> Thanks,
> Jorge
>
>
> On Thu, Jul 2, 2020 at 3:29 PM Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
>
>> Thanks John.
>>
>>> it looks like there’s a revision error in which two methods are
>> proposed for SessionStore, but seem like they should be in
>> ReadOnlySessionStore. Do I read that right?
>>
>> Yes, I've opted to keep the new methods alongside the existing ones.
>>> In
>> the case of SessionStore, `findSessions` are in `SessionStore`, and
>>> `fetch`
>> are in `ReadOnlySessionStore`. If it makes more sense, I can move all
>>> of
>> them to ReadOnlySessionStore.
>> Let me know what you think.
>>
>> Thanks,
>> Jorge.
>>
>> On Thu, Jul 2, 2020 at 2:36 PM John Roesler 
>>> wrote:
>>
>>> Hi Jorge,
>>>
>>> Thanks for the update. I think this is a good plan.
>>>
>>> I just took a look at the KIP again, and it looks like there’s a
>>> revision error in which two methods are proposed for SessionStore,
>>> but seem
>>> like they should be in ReadOnlySessionStore. Do I read that right?
>>>
>>> Otherwise, I’m happy with your proposal.
>>>
>>> Thanks,
>>> John
>>>
>>> On Wed, Jul 1, 2020, at 17:01, Jorge Esteban Quilcate Otoya wrote:
 Quick update: KIP is updated with latest changes now.
 Will leave it open this week while working on the PR.

 Hope to open a new vote thread over the next few days if no
>>> additional
 feedback is provided.

 Cheers,
 Jorge.

 On Mon, Jun 29, 2020 at 11:30 AM Jorge Esteban Quilcate Otoya <
 quilcate.jo...@gmail.com> wrote:

> Thanks, John!
>
> Make sense to reconsider the current approach. I was heading in a
>>> similar
> direction while drafting the implementation. Metered, Caching,
>>> and
>>> other
> layers will also have to get duplicated to build up new methods
>>> in
>>> `Stores`
> factory, and class casting issues would appear on stores created
>>> by
>>> DSL.
>
> I will draft a proposal with new methods (move methods from
>>> proposed
> interfaces to existing ones) with default implementation in a KIP
>>> 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread John Roesler
Thanks for the reply, Sophie. 

That all sounds about right to me.

The Windows “interface”/algorithm is quite flexible, so it makes sense for it 
to be extensible. Different implementations can (and do) enumerate different 
windows to suit different use cases. 

On the other hand, I can’t think of any way to extend SessionWindows to do 
something different using the same algorithm, so it makes sense for it to stay 
final.

If we think SlidingWindows is similarly not usefully extensible, then we can 
make it final. It’s easy to remove final later, but adding it is not possible. 
Or we could go the other route and just make it an interface, on general 
principle. Both of these choices are safe API design.

Thanks again,
John

On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
> >
> > Users could pass in a custom `SessionWindows` as
> > long as the session algorithm works correctly for it.
> 
> 
> Well not really, SessionWindows is a final class. TimeWindows is also a
> final class, so neither of these can be extended/customized. For a given
> Windows then there would only be three (non-overlapping) possibilities:
> either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
> think there's any problem with determining what the user wants in this case
> -- 
> we would just check if it's a SlidingWindows and connect the new  processor,
> or else connect the existing hopping/tumbling window processor.
> 
> I'll admit that last sentence does leave a bad taste in my mouth. Part of
> that
> is probably the "leaking" API Matthias pointed out; we just assume the
> hopping/tumbling window implementation fits all custom windows. But I guess
> if you really needed to customize the algorithm any further you may as well
> stick in a transformer and do it all yourself.
> 
> Anyways, given what we have, it does seem weird to apply one algorithm
> for most Windows types and then swap in a different one for one specific
> extension of Windows. So adding a new #windowedBy(SlidingWindows)
> sounds reasonable to me.
> 
> I'm still not convinced that we need a whole new TimeWindowedKStream
> equivalent class for sliding windows though. It seems like the
> hopping/tumbling
> window implementation could benefit just as much from a subtractor as the
> sliding windows so the only future-proofing we get is the ability to be
> lazy and
> add the subtractor to one but not the other. Of course it would only be an
> optimization so we could just not apply it to one type and nothing would
> break.
> It does make me nervous to go against the "future-proof" direction, though.
> Are there any other examples of things we might want to add to one window
> type but not to another?
> 
> On another note, this actually brings up a new question: should
> SlidingWindows
> also be final? My take is "yes" since there's no reasonable customization of
> sliding windows, at least not that I can think of. Thoughts?
> 
> 
> On Wed, Jul 22, 2020 at 7:15 PM John Roesler  wrote:
> 
> > Thanks, all,
> >
> > I can see how my conclusion was kind of a leap.
> >
> > What Matthias said is indeed what I was thinking. When you provide a
> > window definition to the windowBy() method, you are selecting an algorithm
> > that will be used to compute the windows from the input data.
> >
> > I didn’t mean the code implementation  “algorithm”, but the high-level
> > algorithm that describes how the input stream will be transformed into a
> > sequence of windows.
> >
> > For example, the algorithm for Windows is something like “given the record
> > timestamp, include the record in each of the enumerated windows”. Note that
> > there can be a lot of variation in how the windows are enumerated, which is
> > why there are at least a couple of implementations of Windows, and we are
> > open to adding more (like for natural calendar boundaries).
> >
> > For SessionWindows, it’s more like “if any window is within the gap,
> > extend its boundaries to include this record and if two windows touch, then
> > merge them”.
> >
> > Clearly, the algorithm for SlidingWindows doesn’t fall into either
> > category, so it seems inappropriate to claim that it does in the API (by
> > implementing Windows with stubbed methods) and then cast internally to
> > execute a completely different algorithm.
> >
> > To your other observation, that the DSL object resulting from windowBy
> > would look the same for Windows and SessionWindows, maybe it makes sense
> > for windowBy(SessionWindows) also to return a TimeWindowedKStream.
> >
> > i.e.:
> > ===
> >  TimeWindowedKStream windowedBy(final Windows
> > windows);
> > TimeWindowedKStream windowedBy(final SlidingWindows windows);
> > ===
> >
> >  I can’t think of a reason this wouldn’t work. But then again, it would be
> > more future-proof to go ahead and specify a different return type now, if
> > we think we'll want to add subtractors and stuff later. I don't have a
> > strong feeling about 

[jira] [Created] (KAFKA-10300) fix flaky core/group_mode_transactions_test.py

2020-07-22 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10300:
--

 Summary: fix flaky core/group_mode_transactions_test.py
 Key: KAFKA-10300
 URL: https://issues.apache.org/jira/browse/KAFKA-10300
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


{quote}

test_id:    
kafkatest.tests.core.group_mode_transactions_test.GroupModeTransactionsTest.test_transactions.failure_mode=hard_bounce.bounce_target=brokers

status:     FAIL

run time:   9 minutes 47.698 seconds

 

 

    copier-0 - Failed to copy all messages in 240s.

Traceback (most recent call last):

  File 
"/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", line 
134, in run

    data = self.run_test()

  File 
"/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", line 
192, in run_test

    return self.test_context.function(self.test)

  File "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line 
429, in wrapper

    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)

  File 
"/opt/kafka-dev/tests/kafkatest/tests/core/group_mode_transactions_test.py", 
line 271, in test_transactions

    num_messages_to_copy=self.num_seed_messages)

  File 
"/opt/kafka-dev/tests/kafkatest/tests/core/group_mode_transactions_test.py", 
line 230, in copy_messages_transactionally

    (copier.transactional_id, copier_timeout_sec))

  File "/usr/local/lib/python2.7/dist-packages/ducktape/utils/util.py", line 
41, in wait_until

    raise TimeoutError(err_msg() if callable(err_msg) else err_msg)

TimeoutError: copier-0 - Failed to copy all messages in 240s.

{quote}

 

this issue is same to KAFKA-10274 so we can apply the same approach



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk8 #4735

2020-07-22 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Fix deprecation version for NotLeaderForPartitionException

[github] MINOR: Fix SslEngineFactory javadoc (#9055)


--
[...truncated 6.36 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED


Re: [DISCUSS] KIP-646 Serializer API should support ByteBuffer

2020-07-22 Thread Chia-Ping Tsai
Thanks for quick feedback! Ismael 

> Are there options with lower impact that still help us achieve the goal for 
> those who need it?
> For example, it could be an opt-in thing instead of forcing the world to 
> change.

It seems to me there are two alternatives.

1. Introduce an new extended serializer (there is an existent but deprecated 
one). if the serializer is extended type, we call new method to get ByteBuffer. 
Users who have no interest of ByteBuffer keep using standard Serializer 
interface.
2. Don’t deprecate the existent serialize methods. users are not under the 
pressure of API migration

--
Chia-Ping

On 2020/07/22 17:16:37, Ismael Juma  wrote: 
> Hi Chia-Ping,
> 
> Thanks for the KIP. It seems like the path chosen here would cause a
> massive impact to user code. Are there options with lower impact that still
> help us achieve the goal for those who need it? For example, it could be an
> opt-in thing instead of forcing the world to change.
> 
> Ismael
> 
> On Wed, Jul 22, 2020 at 9:43 AM Chia-Ping Tsai  wrote:
> 
> > hi folks,
> >
> > I would like to discuss KIP-646. The KIP plans to use ByteBuffer to be the
> > return type of Serializer#serialize. It opens the door to manage the memory
> > more effectively and flexible.
> >
> > https://cwiki.apache.org/confluence/x/RiR4CQ
> >
> > The change involved by this KIP is huge so it would be better to get
> > enough feedback before starting to work :)
> >
> > Cheers,
> > Chia-Ping
> >
> 


Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread Sophie Blee-Goldman
>
> Users could pass in a custom `SessionWindows` as
> long as the session algorithm works correctly for it.


Well not really, SessionWindows is a final class. TimeWindows is also a
final class, so neither of these can be extended/customized. For a given
Windows then there would only be three (non-overlapping) possibilities:
either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
think there's any problem with determining what the user wants in this case
-- 
we would just check if it's a SlidingWindows and connect the new  processor,
or else connect the existing hopping/tumbling window processor.

I'll admit that last sentence does leave a bad taste in my mouth. Part of
that
is probably the "leaking" API Matthias pointed out; we just assume the
hopping/tumbling window implementation fits all custom windows. But I guess
if you really needed to customize the algorithm any further you may as well
stick in a transformer and do it all yourself.

Anyways, given what we have, it does seem weird to apply one algorithm
for most Windows types and then swap in a different one for one specific
extension of Windows. So adding a new #windowedBy(SlidingWindows)
sounds reasonable to me.

I'm still not convinced that we need a whole new TimeWindowedKStream
equivalent class for sliding windows though. It seems like the
hopping/tumbling
window implementation could benefit just as much from a subtractor as the
sliding windows so the only future-proofing we get is the ability to be
lazy and
add the subtractor to one but not the other. Of course it would only be an
optimization so we could just not apply it to one type and nothing would
break.
It does make me nervous to go against the "future-proof" direction, though.
Are there any other examples of things we might want to add to one window
type but not to another?

On another note, this actually brings up a new question: should
SlidingWindows
also be final? My take is "yes" since there's no reasonable customization of
sliding windows, at least not that I can think of. Thoughts?


On Wed, Jul 22, 2020 at 7:15 PM John Roesler  wrote:

> Thanks, all,
>
> I can see how my conclusion was kind of a leap.
>
> What Matthias said is indeed what I was thinking. When you provide a
> window definition to the windowBy() method, you are selecting an algorithm
> that will be used to compute the windows from the input data.
>
> I didn’t mean the code implementation  “algorithm”, but the high-level
> algorithm that describes how the input stream will be transformed into a
> sequence of windows.
>
> For example, the algorithm for Windows is something like “given the record
> timestamp, include the record in each of the enumerated windows”. Note that
> there can be a lot of variation in how the windows are enumerated, which is
> why there are at least a couple of implementations of Windows, and we are
> open to adding more (like for natural calendar boundaries).
>
> For SessionWindows, it’s more like “if any window is within the gap,
> extend its boundaries to include this record and if two windows touch, then
> merge them”.
>
> Clearly, the algorithm for SlidingWindows doesn’t fall into either
> category, so it seems inappropriate to claim that it does in the API (by
> implementing Windows with stubbed methods) and then cast internally to
> execute a completely different algorithm.
>
> To your other observation, that the DSL object resulting from windowBy
> would look the same for Windows and SessionWindows, maybe it makes sense
> for windowBy(SessionWindows) also to return a TimeWindowedKStream.
>
> i.e.:
> ===
>  TimeWindowedKStream windowedBy(final Windows
> windows);
> TimeWindowedKStream windowedBy(final SlidingWindows windows);
> ===
>
>  I can’t think of a reason this wouldn’t work. But then again, it would be
> more future-proof to go ahead and specify a different return type now, if
> we think we'll want to add subtractors and stuff later. I don't have a
> strong feeling about that part of the API. It seems to be independent of
> whether SlidingWindows extends Windows or not.
>
> Thanks,
> -John
>
> On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote:
> > I think what John tries to say is the following:
> >
> > We have `windowedBy(Windows)` that accept hopping/tumbling windows but
> > also custom window and we use a specific algorithm. Note, that custom
> > windows must "work" based on the used algorithm.
> >
> > For session windows we have `windowedBy(SessionWindows)` and apply a
> > different algorithm. Users could pass in a custom `SessionWindows` as
> > long as the session algorithm works correctly for it.
> >
> > For the new sliding windows, we want to use a different algorithm
> > compare to hopping/tumbling windows. If we let sliding window extend
> > `Windows`, we can decide at runtime if we need to use the
> > hopping/tumbling window algorithm for hopping/tumbling windows or the
> > new sliding window algorithm for 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread John Roesler
Thanks, all,

I can see how my conclusion was kind of a leap.

What Matthias said is indeed what I was thinking. When you provide a window 
definition to the windowBy() method, you are selecting an algorithm that will 
be used to compute the windows from the input data.

I didn’t mean the code implementation  “algorithm”, but the high-level 
algorithm that describes how the input stream will be transformed into a 
sequence of windows.

For example, the algorithm for Windows is something like “given the record 
timestamp, include the record in each of the enumerated windows”. Note that 
there can be a lot of variation in how the windows are enumerated, which is why 
there are at least a couple of implementations of Windows, and we are open to 
adding more (like for natural calendar boundaries).

For SessionWindows, it’s more like “if any window is within the gap, extend its 
boundaries to include this record and if two windows touch, then merge them”.

Clearly, the algorithm for SlidingWindows doesn’t fall into either category, so 
it seems inappropriate to claim that it does in the API (by implementing 
Windows with stubbed methods) and then cast internally to execute a completely 
different algorithm.

To your other observation, that the DSL object resulting from windowBy would 
look the same for Windows and SessionWindows, maybe it makes sense for 
windowBy(SessionWindows) also to return a TimeWindowedKStream.

i.e.:
===
 TimeWindowedKStream windowedBy(final Windows 
windows);
TimeWindowedKStream windowedBy(final SlidingWindows windows);
===

 I can’t think of a reason this wouldn’t work. But then again, it would be more 
future-proof to go ahead and specify a different return type now, if we think 
we'll want to add subtractors and stuff later. I don't have a strong feeling 
about that part of the API. It seems to be independent of whether 
SlidingWindows extends Windows or not.

Thanks,
-John

On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote:
> I think what John tries to say is the following:
> 
> We have `windowedBy(Windows)` that accept hopping/tumbling windows but
> also custom window and we use a specific algorithm. Note, that custom
> windows must "work" based on the used algorithm.
> 
> For session windows we have `windowedBy(SessionWindows)` and apply a
> different algorithm. Users could pass in a custom `SessionWindows` as
> long as the session algorithm works correctly for it.
> 
> For the new sliding windows, we want to use a different algorithm
> compare to hopping/tumbling windows. If we let sliding window extend
> `Windows`, we can decide at runtime if we need to use the
> hopping/tumbling window algorithm for hopping/tumbling windows or the
> new sliding window algorithm for sliding windows. However, if we get a
> custom window, which algorithm do we pick now? The existing
> tumbling/hopping window algorithm of the new sliding window algorithm?
> Both a custom "time-window" and custom "sliding window" implement the
> generic `Windows` class and thus we cannot make a decision as we don't
> know the user's intent.
> 
> As a matter of fact, even if the user might not be aware of it, the
> algorithm we use does already leak into the API (if a user extends
> `Windows` is must work with our hopping/tumbling window algorithm and if
> a user extends `SessionWindows` it must work with our session algorithm)
> and it seems we need to preserve this property for sliding window.
> 
> 
> -Matthias
> 
> On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote:
> > Hey John,
> > 
> > Just a few follow-up questions/comments about the whole Windows thing:
> > 
> > That's a good way of looking at things; in particular the point about
> > SessionWindows
> > for example requiring a Merger while other "statically enumerable" windows
> > require
> > only an adder seems to touch on the heart of the matter.
> > 
> >  It seems like what Time and Universal (and any other Windows
> >> implementation) have in common is that the windows are statically
> >> enumerable.
> >> As a consequence, they can all rely on an aggregation maintenence algorithm
> >> that involves enumerating each of the windows and updating it. That
> >> also means that their DSL object (TimeWindowedKStream) doesn't need
> >> "subtractors" or "mergers", but only "adders"; again, this is a consequence
> >> of the fact that the windows are enumerable.
> > 
> > 
> > Given that, I'm a bit confused why you conclude that sliding windows are
> > fundamentally
> > different from the "statically enumerable" windows -- sliding windows
> > require only an
> > adder too. I'm not sure it's a consequence of being enumerable, or that
> > being enumerable
> > is the fundamental property that unites all Windows (ignoring JoinWindows
> > here). Yes,  it
> > currently does apply to all Windows implementations, but we shouldn't
> > assume that it
> > *has *to be that way on the basis that it currently happens to be.
> > 
> > Also, 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread Matthias J. Sax
I think what John tries to say is the following:

We have `windowedBy(Windows)` that accept hopping/tumbling windows but
also custom window and we use a specific algorithm. Note, that custom
windows must "work" based on the used algorithm.

For session windows we have `windowedBy(SessionWindows)` and apply a
different algorithm. Users could pass in a custom `SessionWindows` as
long as the session algorithm works correctly for it.

For the new sliding windows, we want to use a different algorithm
compare to hopping/tumbling windows. If we let sliding window extend
`Windows`, we can decide at runtime if we need to use the
hopping/tumbling window algorithm for hopping/tumbling windows or the
new sliding window algorithm for sliding windows. However, if we get a
custom window, which algorithm do we pick now? The existing
tumbling/hopping window algorithm of the new sliding window algorithm?
Both a custom "time-window" and custom "sliding window" implement the
generic `Windows` class and thus we cannot make a decision as we don't
know the user's intent.

As a matter of fact, even if the user might not be aware of it, the
algorithm we use does already leak into the API (if a user extends
`Windows` is must work with our hopping/tumbling window algorithm and if
a user extends `SessionWindows` it must work with our session algorithm)
and it seems we need to preserve this property for sliding window.


-Matthias

On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote:
> Hey John,
> 
> Just a few follow-up questions/comments about the whole Windows thing:
> 
> That's a good way of looking at things; in particular the point about
> SessionWindows
> for example requiring a Merger while other "statically enumerable" windows
> require
> only an adder seems to touch on the heart of the matter.
> 
>  It seems like what Time and Universal (and any other Windows
>> implementation) have in common is that the windows are statically
>> enumerable.
>> As a consequence, they can all rely on an aggregation maintenence algorithm
>> that involves enumerating each of the windows and updating it. That
>> also means that their DSL object (TimeWindowedKStream) doesn't need
>> "subtractors" or "mergers", but only "adders"; again, this is a consequence
>> of the fact that the windows are enumerable.
> 
> 
> Given that, I'm a bit confused why you conclude that sliding windows are
> fundamentally
> different from the "statically enumerable" windows -- sliding windows
> require only an
> adder too. I'm not sure it's a consequence of being enumerable, or that
> being enumerable
> is the fundamental property that unites all Windows (ignoring JoinWindows
> here). Yes,  it
> currently does apply to all Windows implementations, but we shouldn't
> assume that it
> *has *to be that way on the basis that it currently happens to be.
> 
> Also, the fact that they can all rely on the same aggregation algorithm
> seems like an
> implementation detail and it would be weird to force a separate/new DSL API
> just because
> under the covers we swap in a different processor.
> 
> To be fair, I don't think there's a strong reason *against* not extending
> Windows -- in the end
> it will just mean adding a new #windowedBy method and copy/pasting
> everything from
>  TimeWindowedKStream pretty much word for word. But anytime you find
> yourself
> copying over code almost exactly, there should probably be a good reason
> why :)
> 
> 
> On Wed, Jul 22, 2020 at 3:48 PM John Roesler  wrote:
> 
>> Thanks Leah!
>>
>> 5) Regarding the empty windows, I'm wondering if we should simply propose
>> that the windows should not be emitted downstream of the operator or
>> visible in IQ. Then, it'll be up to the implementation to make it happen.
>> I'm
>> personally not concerned about it, since it seems like there are multiple
>> ways to accomplish this.
>>
>> Note, the discrepancy Matthias pointed out is actually a design bug. The
>> windowed aggregation (like every operation in Streams) produces a "view",
>> which then forms the basis of downstream operations. When we pass the
>> Materialized option to the operation, all we're doing is saying to
>> "materialize"
>> the view (aka, actually store the computed view) and also make it
>> queriable.
>> It would be illegal for the "queriable, materialized view" to differ in
>> any way
>> from the "view". So, it seems we must either propose to emit the empty
>> windows AND make them visible in IQ, or propose NOT to emit the empty
>> windows AND NOT make them visible in IQ.
>>
>> 7) Regarding whether we can extend TimeWindows (or Windows):
>> I've been mulling this over more. I think it's worth asking the question of
>> what these classes even mean. For example, why is SessionWindows a
>> different thing from TimeWindows and UniversalWindows (which are both
>> Windows)?
>>
>> This conversation is extra complicated because of the incomplete and
>> mis-matched class hierarchy, but we can try to look past it for now.
>>
>> It seems like what 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread Sophie Blee-Goldman
Hey John,

Just a few follow-up questions/comments about the whole Windows thing:

That's a good way of looking at things; in particular the point about
SessionWindows
for example requiring a Merger while other "statically enumerable" windows
require
only an adder seems to touch on the heart of the matter.

 It seems like what Time and Universal (and any other Windows
> implementation) have in common is that the windows are statically
> enumerable.
> As a consequence, they can all rely on an aggregation maintenence algorithm
> that involves enumerating each of the windows and updating it. That
> also means that their DSL object (TimeWindowedKStream) doesn't need
> "subtractors" or "mergers", but only "adders"; again, this is a consequence
> of the fact that the windows are enumerable.


Given that, I'm a bit confused why you conclude that sliding windows are
fundamentally
different from the "statically enumerable" windows -- sliding windows
require only an
adder too. I'm not sure it's a consequence of being enumerable, or that
being enumerable
is the fundamental property that unites all Windows (ignoring JoinWindows
here). Yes,  it
currently does apply to all Windows implementations, but we shouldn't
assume that it
*has *to be that way on the basis that it currently happens to be.

Also, the fact that they can all rely on the same aggregation algorithm
seems like an
implementation detail and it would be weird to force a separate/new DSL API
just because
under the covers we swap in a different processor.

To be fair, I don't think there's a strong reason *against* not extending
Windows -- in the end
it will just mean adding a new #windowedBy method and copy/pasting
everything from
 TimeWindowedKStream pretty much word for word. But anytime you find
yourself
copying over code almost exactly, there should probably be a good reason
why :)


On Wed, Jul 22, 2020 at 3:48 PM John Roesler  wrote:

> Thanks Leah!
>
> 5) Regarding the empty windows, I'm wondering if we should simply propose
> that the windows should not be emitted downstream of the operator or
> visible in IQ. Then, it'll be up to the implementation to make it happen.
> I'm
> personally not concerned about it, since it seems like there are multiple
> ways to accomplish this.
>
> Note, the discrepancy Matthias pointed out is actually a design bug. The
> windowed aggregation (like every operation in Streams) produces a "view",
> which then forms the basis of downstream operations. When we pass the
> Materialized option to the operation, all we're doing is saying to
> "materialize"
> the view (aka, actually store the computed view) and also make it
> queriable.
> It would be illegal for the "queriable, materialized view" to differ in
> any way
> from the "view". So, it seems we must either propose to emit the empty
> windows AND make them visible in IQ, or propose NOT to emit the empty
> windows AND NOT make them visible in IQ.
>
> 7) Regarding whether we can extend TimeWindows (or Windows):
> I've been mulling this over more. I think it's worth asking the question of
> what these classes even mean. For example, why is SessionWindows a
> different thing from TimeWindows and UniversalWindows (which are both
> Windows)?
>
> This conversation is extra complicated because of the incomplete and
> mis-matched class hierarchy, but we can try to look past it for now.
>
> It seems like what Time and Universal (and any other Windows
> implementation) have in common is that the windows are statically
> enumerable.
> As a consequence, they can all rely on an aggregation maintenence algorithm
> that involves enumerating each of the windows and updating it. That
> also means that their DSL object (TimeWindowedKStream) doesn't need
> "subtractors" or "mergers", but only "adders"; again, this is a consequence
> of the fact that the windows are enumerable.
>
> In contrast, session windows are data driven, so they are not statically
> enumerable. Their algorithm has to rely on scans, and to do the scans,
> it needs to know the "inactivity gap", which needs to be part of the window
> definition. Likewise, session windows have the property that they need
> to be merged, so their DSL object also requires mergers.
>
> It really seems like your new window definition doesn't fit into either
> category. It uses a different algorithm, which relies on scans, but it is
> also fixed in size, so it doesn't need mergers. In this situation, it seems
> like the safe bet is to just create SessionWindows with no interface and
> add a separate set of DSL operations and objects. It's a little extra code,
> but it seems to keep everything tidier and more comprehensible, both
> for us and for users.
>
> What do you think?
> -John
>
>
>
> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote:
> > Hi Matthias,
> >
> > Thanks for the suggestions, I've updated the KIP and child page
> accordingly
> > and addressed some below.
> >
> > 1) For the mandatory grace period, we should use a static 

Jenkins build is back to normal : kafka-trunk-jdk8 #4734

2020-07-22 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk14 #309

2020-07-22 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread John Roesler
Thanks Leah!

5) Regarding the empty windows, I'm wondering if we should simply propose
that the windows should not be emitted downstream of the operator or
visible in IQ. Then, it'll be up to the implementation to make it happen. I'm
personally not concerned about it, since it seems like there are multiple
ways to accomplish this.

Note, the discrepancy Matthias pointed out is actually a design bug. The
windowed aggregation (like every operation in Streams) produces a "view",
which then forms the basis of downstream operations. When we pass the
Materialized option to the operation, all we're doing is saying to "materialize"
the view (aka, actually store the computed view) and also make it queriable.
It would be illegal for the "queriable, materialized view" to differ in any way
from the "view". So, it seems we must either propose to emit the empty
windows AND make them visible in IQ, or propose NOT to emit the empty
windows AND NOT make them visible in IQ.

7) Regarding whether we can extend TimeWindows (or Windows):
I've been mulling this over more. I think it's worth asking the question of
what these classes even mean. For example, why is SessionWindows a
different thing from TimeWindows and UniversalWindows (which are both
Windows)?

This conversation is extra complicated because of the incomplete and
mis-matched class hierarchy, but we can try to look past it for now.

It seems like what Time and Universal (and any other Windows 
implementation) have in common is that the windows are statically enumerable.
As a consequence, they can all rely on an aggregation maintenence algorithm
that involves enumerating each of the windows and updating it. That
also means that their DSL object (TimeWindowedKStream) doesn't need
"subtractors" or "mergers", but only "adders"; again, this is a consequence
of the fact that the windows are enumerable.

In contrast, session windows are data driven, so they are not statically
enumerable. Their algorithm has to rely on scans, and to do the scans,
it needs to know the "inactivity gap", which needs to be part of the window
definition. Likewise, session windows have the property that they need
to be merged, so their DSL object also requires mergers.

It really seems like your new window definition doesn't fit into either
category. It uses a different algorithm, which relies on scans, but it is
also fixed in size, so it doesn't need mergers. In this situation, it seems
like the safe bet is to just create SessionWindows with no interface and
add a separate set of DSL operations and objects. It's a little extra code,
but it seems to keep everything tidier and more comprehensible, both
for us and for users.

What do you think?
-John



On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote:
> Hi Matthias,
> 
> Thanks for the suggestions, I've updated the KIP and child page accordingly
> and addressed some below.
> 
> 1) For the mandatory grace period, we should use a static builder method
> > that take two parameters.
> >
> 
>  That makes sense, I've changed that in the public API.
> 
> Btw: this implementation actually raises an issue for IQ: those empty
> > windows would be returned.
> 
> 
> This is a great point, with the current implementation plan empty windows
> would be returned. I think creating a second window store would definitely
> work, but there would be more overhead in having two stores and switching
> windows between the stores, as well as doing scans in both stores to find
> existing windows. There might be a way to do avoid emitting empty windows
> without creating a second window store, I'll look more into it.
> 
> Cheers,
> Leah
> 
> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax  wrote:
> 
> > Thanks for updating the KIP.
> >
> > Couple of follow up comments:
> >
> > 1) For the mandatory grace period, we should use a static builder method
> > that take two parameters. This provides a better API as user cannot
> > forget to set the grace period. Throwing a runtime exception seems not
> > to be the best way to handle this case.
> >
> >
> >
> > 2) In Fig.2 you list 10 hopping windows. I believe it should actually be
> > more? There first hopping window would be [-6,-4[ and the last one would
> > be from [19,29[ -- hence, the cost saving are actually much higher.
> >
> >
> >
> > 3a) IQ: you are saying that the user need to compute the start time as
> >
> > > windowSize+the time they're looking at
> >
> > Should this be "targetTime - windowSize" instead?
> >
> >
> >
> > 3b) IQ: in you example you say "window size of 10 minutes" with an
> > incident at 9:15.
> >
> > > they're looking for a window with the start time of 8:15.
> >
> > The example does not seem to add up?
> >
> >
> >
> > 4) For "Processing Windows": you describe a three step approach: I just
> > want to point out, that step (1) is not necessary for each input record,
> > because timestamps are not guaranteed to be unique and thus a previous
> > record with the same key and timestamp might have create 

Jenkins build is back to normal : kafka-2.6-jdk8 #93

2020-07-22 Thread Apache Jenkins Server
See 



Jenkins build is back to normal : kafka-trunk-jdk11 #1659

2020-07-22 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-10274) Transaction system test uses inconsistent timeouts

2020-07-22 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-10274.
-
Fix Version/s: 2.6.0
   Resolution: Fixed

Merged this to trunk and 2.6 branch.

> Transaction system test uses inconsistent timeouts
> --
>
> Key: KAFKA-10274
> URL: https://issues.apache.org/jira/browse/KAFKA-10274
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.6.0
>
>
> We've seen some failures in the transaction system test with errors like the 
> following:
> {code}
> copier-1 : Message copier didn't make enough progress in 30s. Current 
> progress: 0
> {code}
> Looking at the consumer logs, we see the following messages repeating over 
> and over:
> {code}
> [2020-07-14 06:50:21,466] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Fetching committed offsets for 
> partitions: [input-topic-1] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [2020-07-14 06:50:21,468] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Failed to fetch offset for 
> partition input-topic-1: There are unstable offsets that need to be cleared. 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> {code}
> I think the problem is that the test implicitly depends on the transaction 
> timeout which has been configured to 40s even though it expects progress 
> after 30s.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10164) Implement Admin side changes

2020-07-22 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-10164.

Fix Version/s: 2.7.0
 Reviewer: Rajini Sivaram
 Assignee: David Jacot
   Resolution: Fixed

> Implement Admin side changes
> 
>
> Key: KAFKA-10164
> URL: https://issues.apache.org/jira/browse/KAFKA-10164
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk8 #4733

2020-07-22 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10163; Throttle Create Topic, Create Partition and Delete Topic


--
[...truncated 3.18 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

> 

Build failed in Jenkins: kafka-trunk-jdk14 #308

2020-07-22 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10163; Throttle Create Topic, Create Partition and Delete Topic


--
[...truncated 3.20 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 

Re: [DISCUSS] KIP-646 Serializer API should support ByteBuffer

2020-07-22 Thread Ismael Juma
Hi Chia-Ping,

Thanks for the KIP. It seems like the path chosen here would cause a
massive impact to user code. Are there options with lower impact that still
help us achieve the goal for those who need it? For example, it could be an
opt-in thing instead of forcing the world to change.

Ismael

On Wed, Jul 22, 2020 at 9:43 AM Chia-Ping Tsai  wrote:

> hi folks,
>
> I would like to discuss KIP-646. The KIP plans to use ByteBuffer to be the
> return type of Serializer#serialize. It opens the door to manage the memory
> more effectively and flexible.
>
> https://cwiki.apache.org/confluence/x/RiR4CQ
>
> The change involved by this KIP is huge so it would be better to get
> enough feedback before starting to work :)
>
> Cheers,
> Chia-Ping
>


Re: [VOTE] 2.6.0 RC1

2020-07-22 Thread Randall Hauch
Any thoughts, Rajini?

On Mon, Jul 20, 2020 at 9:55 PM Randall Hauch  wrote:

>
> When I was checking the documentation for RC1 after the tag was pushed, I
> noticed that the fix Rajini mentioned in the RC0 vote thread (
> https://github.com/apache/kafka/pull/8979
> )
> and merged to the `2.6` branch includes the following comment about being
> deprecated in 2.7:
> https://github.com/apache/kafka/pull/8979/files#diff-369f0debebfcda6709beeaf11612b34bR20-R21
> .
>
> Rajini, can you please check the commits merged to the `2.6` do not have
> the reference to 2.7? Since these are JavaDocs, I'm assuming that we'll
> need to cut RC2.
>
> But it'd be good for everyone else to double check this release.
>
> Best regards,
>
> Randall Hauch
>
> On Mon, Jul 20, 2020 at 9:50 PM Randall Hauch  wrote:
>
>> Hello Kafka users, developers and client-developers,
>>
>> This is the second candidate for release of Apache Kafka 2.6.0. This is a
>> major release that includes many new features, including:
>>
>> * TLSv1.3 has been enabled by default for Java 11 or newer.
>> * Smooth scaling out of Kafka Streams applications
>> * Kafka Streams support for emit on change
>> * New metrics for better operational insight
>> * Kafka Connect can automatically create topics for source connectors
>> * Improved error reporting options for sink connectors in Kafka Connect
>> * New Filter and conditional SMTs in Kafka Connect
>> * The default value for the `client.dns.lookup` configuration is
>> now `use_all_dns_ips`
>> * Upgrade Zookeeper to 3.5.8
>>
>> This release also includes a few other features, 76 improvements, and 165
>> bug fixes.
>>
>> Release notes for the 2.6.0 release:
>> https://home.apache.org/~rhauch/kafka-2.6.0-rc1/RELEASE_NOTES.html
>>
>> *** Please download, test and vote by Monday, July 20, 9am PT
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> https://kafka.apache.org/KEYS
>>
>> * Release artifacts to be voted upon (source and binary):
>> https://home.apache.org/~rhauch/kafka-2.6.0-rc1/
>>
>> * Maven artifacts to be voted upon:
>> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>>
>> * Javadoc:
>> https://home.apache.org/~rhauch/kafka-2.6.0-rc1/javadoc/
>>
>> * Tag to be voted upon (off 2.6 branch) is the 2.6.0 tag:
>> https://github.com/apache/kafka/releases/tag/2.6.0-rc1
>>
>> * Documentation:
>> https://kafka.apache.org/26/documentation.html
>>
>> * Protocol:
>> https://kafka.apache.org/26/protocol.html
>>
>> * Successful Jenkins builds for the 2.6 branch:
>> Unit/integration tests: https://builds.apache.org/job/kafka-2.6-jdk8/91/ (one
>> flaky test)
>> System tests: (link to follow)
>>
>> Thanks,
>> Randall Hauch
>>
>


[DISCUSS] KIP-646 Serializer API should support ByteBuffer

2020-07-22 Thread Chia-Ping Tsai
hi folks,

I would like to discuss KIP-646. The KIP plans to use ByteBuffer to be the 
return type of Serializer#serialize. It opens the door to manage the memory 
more effectively and flexible.

https://cwiki.apache.org/confluence/x/RiR4CQ

The change involved by this KIP is huge so it would be better to get enough 
feedback before starting to work :)

Cheers,
Chia-Ping


[jira] [Resolved] (KAFKA-10163) Implement Broker side changes

2020-07-22 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-10163.

Fix Version/s: 2.7.0
 Reviewer: Rajini Sivaram
 Assignee: David Jacot
   Resolution: Fixed

> Implement Broker side changes
> -
>
> Key: KAFKA-10163
> URL: https://issues.apache.org/jira/browse/KAFKA-10163
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Confluent Docker Images

2020-07-22 Thread Nag Y
The Docker images are huge for each confluent component - ZK, registry etc
..

Is there any other place I can download one image that contains all the
components ?

REPOSITORY  TAG IMAGE ID
 CREATED SIZE
confluentinc/ksqldb-examples5.5.1
88f3d11247f33 weeks ago 646MB
confluentinc/cp-ksqldb-server   5.5.1
d2f03e1e91d83 weeks ago 679MB
confluentinc/cp-ksqldb-cli  5.5.1
c2768c7e4cc53 weeks ago 663MB
confluentinc/cp-enterprise-control-center   5.5.1
870dffa09a384 weeks ago 888MB
confluentinc/cp-server  5.5.1
f9758c92d7b44 weeks ago 981MB
confluentinc/cp-schema-registry 5.5.1
f51e4f854dc14 weeks ago 1.19GB
confluentinc/cp-kafka-rest  5.5.1
2632bb34f9564 weeks ago 1.15GB
confluentinc/cp-zookeeper   5.5.1
7149731cc5634 weeks ago 598MB
cnfldemos/cp-server-connect-datagen 0.3.2-5.5.0
8b1a9577099c2 months ago1.53GB


Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread Leah Thomas
Hi Matthias,

Thanks for the suggestions, I've updated the KIP and child page accordingly
and addressed some below.

1) For the mandatory grace period, we should use a static builder method
> that take two parameters.
>

 That makes sense, I've changed that in the public API.

Btw: this implementation actually raises an issue for IQ: those empty
> windows would be returned.


This is a great point, with the current implementation plan empty windows
would be returned. I think creating a second window store would definitely
work, but there would be more overhead in having two stores and switching
windows between the stores, as well as doing scans in both stores to find
existing windows. There might be a way to do avoid emitting empty windows
without creating a second window store, I'll look more into it.

Cheers,
Leah

On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax  wrote:

> Thanks for updating the KIP.
>
> Couple of follow up comments:
>
> 1) For the mandatory grace period, we should use a static builder method
> that take two parameters. This provides a better API as user cannot
> forget to set the grace period. Throwing a runtime exception seems not
> to be the best way to handle this case.
>
>
>
> 2) In Fig.2 you list 10 hopping windows. I believe it should actually be
> more? There first hopping window would be [-6,-4[ and the last one would
> be from [19,29[ -- hence, the cost saving are actually much higher.
>
>
>
> 3a) IQ: you are saying that the user need to compute the start time as
>
> > windowSize+the time they're looking at
>
> Should this be "targetTime - windowSize" instead?
>
>
>
> 3b) IQ: in you example you say "window size of 10 minutes" with an
> incident at 9:15.
>
> > they're looking for a window with the start time of 8:15.
>
> The example does not seem to add up?
>
>
>
> 4) For "Processing Windows": you describe a three step approach: I just
> want to point out, that step (1) is not necessary for each input record,
> because timestamps are not guaranteed to be unique and thus a previous
> record with the same key and timestamp might have create the windows
> already.
>
> Nit: I am also not exactly sure what you mean by step (3) as you use the
> word "send". I guess you mean "put"?
>
> It seem there are actually more details in the sub-page:
>
> > A new record for SlidingWindows will always create two new windows. If
> either of those windows already exist in the windows store, their
> aggregation will simply be updated to include the new record, but no
> duplicate window will be added to the WindowStore.
>
> However, the first and second sentence contradict each other a little
> bit. I think the first sentence is not correct.
>
> Nit:
>
> > For in-order records, the left window will always be empty.
>
> This should be "right window" ?
>
>
>
> 5) "Emitting Results": it might be worth to point out, that a
> second/future window of a new record is create with no records, and
> thus, even if it's initialized it won't be emitted. Only if a
> consecutive record falls into the window, the window would be updates
> and the window result (for a window content of one record) would be sent
> downstream.
>
> Again, the sub-page contains this details. Might still be worth to add
> to the top level page, too.
>
> Btw: this implementation actually raises an issue for IQ: those empty
> windows would be returned. Thus I am wondering if we need to use two
> stores internally? One store for actual windows and one store for empty
> windows? If an empty window is updated, it's move to the other store?
> For IQ, we only allow to query the non-empty-window store?
>
>
>
> 6) On the sub-page:
>
> > The left window of in-order records and both windows for out-of-order
> records need to be updated with the values of records that have already
> been processed.
>
> Why "both windows for out-of-order records"? IMHO, we don't know how
> many existing windows needs to be updated when processing an
> out-of-order record. Of course, an out-of-order record could not fall
> into any existing window but create two new windows, too.
>
> >  Because each record creates one new window that includes itself and one
> window that does not
>
> As state above, this does not seem to hold. I understand why you mean,
> but it would be good to be exact.
>
> Figure 2: You use the term "late" but you mean "out-of-order" I guess --
> a record is _late_ if it's not processed any longer as the grace period
> passed already.
>
> Figure 2: "Late" should be out-or-order. The example text say a window
> [16,26] should be created but the figure shows the green window as [15,20].
>
> About the blue window: maybe add not that the blue window contains the
> aggregate we need for the green window, _before_ the new record `a` is
> added to the blue window.
>
>
>
> 7) I am not really happy to extend TimeWindows and I think the argument
> about JoinWindows is not the best (IMHO, JoinWindows do it already wrong
> and we just repeat the same 

Confluent Platform- KTable clarification

2020-07-22 Thread Nag Y
I understood A KStream is an abstraction of a record stream and A KTable is
an abstraction of a changelog stream ( updates or inserts) and the
semantics around it.

However, this is where some confusion arises .. From confluent documentation


To illustrate, let’s imagine the following two data records are being sent
to the stream:

("alice", 1) --> ("alice", 3)

*If your stream processing application were to sum the values per user*, it
would return 3 for alice. Why? Because the second data record would be
considered an update of the previous record. Compare this behavior of
KTable with the illustration for KStream above, which would return 4 for
alice.

Coming to the highlighted area , *if we were to sum the values* , it should
be 4 . right ? However, *if we were to look at the "updated" view of the
logs* , yes , it is 3 as KTable maintains either updates or inserts . Did I
get it right ?


Confluent Kafka - Schema Registry on windows

2020-07-22 Thread Nag Y
I happened to see an example how to run schema registry using
"schema-registry-start.bat" from windows on 5.0.1

I didnt see the file in 5.5.0 . Is the schema registry not supported in
windows now ? IT seems only the way to go about running schema registry in
windows through dockers . Please someone confirm


Kafka - Controller Broker

2020-07-22 Thread Nag Y
 come across this phrase from https://niqdev.github.io/devops/kafka/ and
https://livebook.manning.com/book/kafka-streams-in-action/chapter-2/109 (Kafka
Streams in Action )

The controller broker is responsible for setting up leader/follower
relationships for all partitions of a topic. If a Kafka node dies or is
unresponsive (to ZooKeeper heartbeats), all of its assigned partitions *(both
leader and follower)* are reassigned by the controller broker.

I think it is not correct - where assigning follower partitions to other
brokers as the partitions wont heal themselves unless the broker comes back
. I know it ONLY happens for leader replica where if the broker that has
leader replica gone down, one of the broker that contains follower will
become leader. But, I dont think "reassigment" of followers will happen
automatically unless reassignment is initiated manually. Please share your
inputs


[jira] [Created] (KAFKA-10299) Add a Hash SMT transformer

2020-07-22 Thread Brandon Brown (Jira)
Brandon Brown created KAFKA-10299:
-

 Summary: Add a Hash SMT transformer
 Key: KAFKA-10299
 URL: https://issues.apache.org/jira/browse/KAFKA-10299
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Brandon Brown


A previous contribution to 
[https://github.com/aiven/aiven-kafka-connect-transforms] was suggested as by a 
member of confluent as being a nice addition to the out of the box Kafka 
Connect SMTs. The discussion is here 
[https://github.com/aiven/aiven-kafka-connect-transforms/issues/9#issuecomment-662378057]

This change would add a new SMT which allows for either a hashing a key or a 
value using the configured hashing function.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-607: Add Metrics to Record the Memory Used by RocksDB to Kafka Streams

2020-07-22 Thread Bruno Cadonna

Thank you John for the proposal.

Indeed I did also not like to extend the RocksDBConfigSetter interface, 
but couldn't find a way around it.


I will remove the interface extension from the KIP and try out your 
proposal. I need to look into the details but after a first glance, I 
think we need to subclass BlockBasedTableConfig instead of Options 
(actually, we already subclass Options). I also have to verify if this 
is the only place where the cache is passed along.


Best,
Bruno

On 21.07.20 17:43, John Roesler wrote:

Thanks for the update, Bruno!

In addition to Guozhang's feedback, I'm a little concerned
about the change to the RocksDBConfigSetter. If I understand
the proposal, people would have to separately supply
their Cache to the Options parameter in setConfig() and also
save it in a field so it can be returned in cache(). If they don't
return the same object, then the metrics won't be accurate,
but otherwise the mistake will be undetectable. Also, the
method is defaulted to return null, so existing implementations
would have no indication that they need to change, except that
users who want to read the new metrics would see inaccurate
values. They probably don't have a priori knowledge that would
let them identify that the reported metrics aren't accurate, and
even if they do notice something is wrong, it would probably take
quite a bit of effort to get all the way to the root cause.

I'm wondering if we can instead avoid the new method and pass
to the ConfigSetter our own subclass of Options (which is non-final
and has only public constructors) that would enable us to capture
and retrieve the Cache later. Or even just use reflection to get the
Cache out of the existing Options object after calling the ConfigSetter.

What do you think?
Thanks again for the update,
-John

On Mon, Jul 20, 2020, at 17:39, Guozhang Wang wrote:

Hello Bruno,

Thanks for the updated KIP. I made a pass and here are some comments:

1) What's the motivation of keeping it as INFO while KIP-471 metrics are
defined in DEBUG?

2) Some namings are a bit inconsistent with others and with KIP-471, for
example:

2.a) KIP-471 uses "memtable" while in this KIP we use "mem-table", also the
"memtable" is prefixed and then the metric name. I'd suggest we keep them
consistent. e.g. "num-immutable-mem-table" => "immutable-memtable-count",
"cur-size-active-mem-table" => "active-memable-bytes"

2.b) "immutable" are abbreviated as "imm" in some names but not in others,
I'd suggest we do not use abbreviations across all names,
e.g. "num-entries-imm-mem-tables" => "immutable-memtable-num-entries".

2.c) "-size" "-num" semantics is usually a bit unclear, and I'd suggest we
just more concrete terms, e.g. "total-sst-files-size" =>
"total-sst-files-bytes", "num-live-versions" => "live-versions-count",
"background-errors" => "background-errors-count".

3) Some metrics are a bit confusing, e.g.

3.a) What's the difference between "cur-size-all-mem-tables" and
"size-all-mem-tables"?

3.b) And the explanation of "estimate-table-readers-mem" does not read very
clear to me either, does it refer to "estimate-sst-file-read-buffer-bytes"?

3.c) How does "estimate-oldest-key-time" help with memory usage debugging?

4) For my own education, does "estimate-pending-compaction-bytes" capture
all the memory usage for compaction buffers?

5) This is just of a nit comment to help readers better understand rocksDB:
maybe we can explain in the wiki doc which part of rocksDB uses memory
(block cache, OS cache, memtable, compaction buffer, read buffer), and
which of them are on-heap and wich of them are off-heap, which can be hard
bounded and which can only be soft bounded and which cannot be bounded at
all, etc.


Guozhang


On Mon, Jul 20, 2020 at 11:00 AM Bruno Cadonna  wrote:


Hi,

During the implementation of this KIP and after some discussion about
RocksDB metrics, I decided to make some major modifications to this KIP
and kick off discussion again.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB

Best,
Bruno

On 15.05.20 17:11, Bill Bejeck wrote:

Thanks for the KIP, Bruno. Having sensible, easy to access RocksDB memory
reporting will be a welcomed addition.

FWIW I also agree to have the metrics reported on a store level. I'm glad
you changed the KIP to that effect.

-Bill



On Wed, May 13, 2020 at 6:24 PM Guozhang Wang 

wrote:



Hi Bruno,

Sounds good to me.

I think I'm just a bit more curious to see its impact on performance: as
long as we have one INFO level rocksDB metrics, then we'd have to turn

on

the scheduled rocksdb metrics recorder whereas previously, we can

decide to

not turn on the recorder at all if all are set as DEBUG and we

configure at

INFO level in production. But this is an implementation detail anyways

and

maybe the impact is negligible after all. We can check and re-discuss

this

afterwards :)


Guozhang


On Wed, May 13, 2020 at 9:34 AM 

Re: [DISCUSS] KIP-607: Add Metrics to Record the Memory Used by RocksDB to Kafka Streams

2020-07-22 Thread Bruno Cadonna

Hi Guozhang,

Thank you for your feedback!

I answered inline.

Best,
Bruno


On 21.07.20 00:39, Guozhang Wang wrote:

Hello Bruno,

Thanks for the updated KIP. I made a pass and here are some comments:

1) What's the motivation of keeping it as INFO while KIP-471 metrics are
defined in DEBUG?



The motivation was that the metrics in this KIP do not incur any 
performance overhead other than reading out the properties from RocksDB. 
For this metrics RocksDB does not need to maintain anything 
additionally. In contrast, for the metrics in KIP-471 RocksDB needs to 
maintain the statistics object we pass to it and we also need to switch 
on a certain statistics level. So, I thought the metrics in this KIP are 
suited to be used in production and therefore can be reported on INFO level.



2) Some namings are a bit inconsistent with others and with KIP-471, for
example:


I am aware of the inconsistencies. I took the names from this list in 
the RocksDB repo 
https://github.com/facebook/rocksdb/blob/b9a4a10659969c71e6f6eab4e4bae8c36ede919f/include/rocksdb/db.h#L654-L686 
(with prefix "rocksdb." ripped off). In this way users do not need to 
look up or memorize a mapping between our metrics and the RocksDB 
properties. To be clear, those are public RocksDB properties.




2.a) KIP-471 uses "memtable" while in this KIP we use "mem-table", also the
"memtable" is prefixed and then the metric name. I'd suggest we keep them
consistent. e.g. "num-immutable-mem-table" => "immutable-memtable-count",
"cur-size-active-mem-table" => "active-memable-bytes"

2.b) "immutable" are abbreviated as "imm" in some names but not in others,
I'd suggest we do not use abbreviations across all names,
e.g. "num-entries-imm-mem-tables" => "immutable-memtable-num-entries".

2.c) "-size" "-num" semantics is usually a bit unclear, and I'd suggest we
just more concrete terms, e.g. "total-sst-files-size" =>
"total-sst-files-bytes", "num-live-versions" => "live-versions-count",
"background-errors" => "background-errors-count".

3) Some metrics are a bit confusing, e.g.

3.a) What's the difference between "cur-size-all-mem-tables" and
"size-all-mem-tables"?



cur-size-all-mem-tables records the approximate size of active and 
unflushed immutable memtable. Unflushed immutable memtables are 
memtables that are not yet flushed by the asynchronous flushing 
mechanism in RocksDB.


size-all-mem-tables records the sizes recorded in 
cur-size-all-mem-tables but additionally also records pinned immutable 
memtables that that are kept in memory to maintain write history in memory.


As far as I understood those are memtables that are flushed but there 
are still table readers (e.g. iterators) that use those memtables.


I added a sentence to explain the difference.

I guess it is worthwhile to have both of these metrics because if 
size-all-mem-tables keeps increasing and cur-size-all-mem-tables not 
there may be an issue with the clean-up of table readers.



3.b) And the explanation of "estimate-table-readers-mem" does not read very
clear to me either, does it refer to "estimate-sst-file-read-buffer-bytes"?



No, this metric records the memory used by iterators as well as filters 
and indices if the filters and indices are not maintained in the block 
cache. Basically this metric reports the memory used outside the block 
cache to read data. I modified the description to make it clearer.



3.c) How does "estimate-oldest-key-time" help with memory usage debugging?


I do not consider this KIP to only help with monitoring of memory usage. 
I thought to expose all RocksDB properties that return an integer and 
that make sense for Kafka Streams.
Admittedly, I did a bad job in the current KIP to explain this in the 
motivation.





4) For my own education, does "estimate-pending-compaction-bytes" capture
all the memory usage for compaction buffers?



No, as far as I understand, this metric refers to bytes rewritten on 
disk. Basically, metric relates to the write amplification for level 
compaction. I changed the description.



5) This is just of a nit comment to help readers better understand rocksDB:
maybe we can explain in the wiki doc which part of rocksDB uses memory
(block cache, OS cache, memtable, compaction buffer, read buffer), and
which of them are on-heap and wich of them are off-heap, which can be hard
bounded and which can only be soft bounded and which cannot be bounded at
all, etc.



Good idea! Will look into it!



Guozhang


On Mon, Jul 20, 2020 at 11:00 AM Bruno Cadonna  wrote:


Hi,

During the implementation of this KIP and after some discussion about
RocksDB metrics, I decided to make some major modifications to this KIP
and kick off discussion again.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB

Best,
Bruno

On 15.05.20 17:11, Bill Bejeck wrote:

Thanks for the KIP, Bruno. Having sensible, easy to access RocksDB memory
reporting will be a 

Fwd: Re: [DISCUSS] KIP-607: Add Metrics to Record the Memory Used by RocksDB to Kafka Streams

2020-07-22 Thread Bruno Cadonna

Hi Guozhang,

Thank you for your feedback!

I answered inline.

Best,
Bruno


On 21.07.20 00:39, Guozhang Wang wrote:

Hello Bruno,

Thanks for the updated KIP. I made a pass and here are some comments:

1) What's the motivation of keeping it as INFO while KIP-471 metrics are
defined in DEBUG?



The motivation was that the metrics in this KIP do not incur any 
performance overhead other than reading out the properties from RocksDB. 
For this metrics RocksDB does not need to maintain anything 
additionally. In contrast, for the metrics in KIP-471 RocksDB needs to 
maintain the statistics object we pass to it and we also need to switch 
on a certain statistics level. So, I thought the metrics in this KIP are 
suited to be used in production and therefore can be reported on INFO level.



2) Some namings are a bit inconsistent with others and with KIP-471, for
example:


I am aware of the inconsistencies. I took the names from this list in 
the RocksDB repo 
https://github.com/facebook/rocksdb/blob/b9a4a10659969c71e6f6eab4e4bae8c36ede919f/include/rocksdb/db.h#L654-L686 
(with prefix "rocksdb." ripped off). In this way users do not need to 
look up or memorize a mapping between our metrics and the RocksDB 
properties. To be clear, those are public RocksDB properties.




2.a) KIP-471 uses "memtable" while in this KIP we use "mem-table", also the
"memtable" is prefixed and then the metric name. I'd suggest we keep them
consistent. e.g. "num-immutable-mem-table" => "immutable-memtable-count",
"cur-size-active-mem-table" => "active-memable-bytes"

2.b) "immutable" are abbreviated as "imm" in some names but not in others,
I'd suggest we do not use abbreviations across all names,
e.g. "num-entries-imm-mem-tables" => "immutable-memtable-num-entries".

2.c) "-size" "-num" semantics is usually a bit unclear, and I'd suggest we
just more concrete terms, e.g. "total-sst-files-size" =>
"total-sst-files-bytes", "num-live-versions" => "live-versions-count",
"background-errors" => "background-errors-count".

3) Some metrics are a bit confusing, e.g.

3.a) What's the difference between "cur-size-all-mem-tables" and
"size-all-mem-tables"?



cur-size-all-mem-tables records the approximate size of active and 
unflushed immutable memtable. Unflushed immutable memtables are 
memtables that are not yet flushed by the asynchronous flushing 
mechanism in RocksDB.


size-all-mem-tables records the sizes recorded in 
cur-size-all-mem-tables but additionally also records pinned immutable 
memtables that that are kept in memory to maintain write history in memory.


As far as I understood those are memtables that are flushed but there 
are still table readers (e.g. iterators) that use those memtables.


I added a sentence to explain the difference.

I guess it is worthwhile to have both of these metrics because if 
size-all-mem-tables keeps increasing and cur-size-all-mem-tables not 
there may be an issue with the clean-up of table readers.



3.b) And the explanation of "estimate-table-readers-mem" does not read very
clear to me either, does it refer to "estimate-sst-file-read-buffer-bytes"?



No, this metric records the memory used by iterators as well as filters 
and indices if the filters and indices are not maintained in the block 
cache. Basically this metric reports the memory used outside the block 
cache to read data. I modified the description to make it clearer.



3.c) How does "estimate-oldest-key-time" help with memory usage debugging?


I do not consider this KIP to only help with monitoring of memory usage. 
I thought to expose all RocksDB properties that return an integer and 
that make sense for Kafka Streams.
Admittedly, I did a bad job in the current KIP to explain this in the 
motivation.





4) For my own education, does "estimate-pending-compaction-bytes" capture
all the memory usage for compaction buffers?



No, as far as I understand, this metric refers to bytes rewritten on 
disk. Basically, metric relates to the write amplification for level 
compaction. I changed the description.



5) This is just of a nit comment to help readers better understand rocksDB:
maybe we can explain in the wiki doc which part of rocksDB uses memory
(block cache, OS cache, memtable, compaction buffer, read buffer), and
which of them are on-heap and wich of them are off-heap, which can be hard
bounded and which can only be soft bounded and which cannot be bounded at
all, etc.



Good idea! Will look into it!



Guozhang


On Mon, Jul 20, 2020 at 11:00 AM Bruno Cadonna  wrote:


Hi,

During the implementation of this KIP and after some discussion about
RocksDB metrics, I decided to make some major modifications to this KIP
and kick off discussion again.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB

Best,
Bruno

On 15.05.20 17:11, Bill Bejeck wrote:

Thanks for the KIP, Bruno. Having sensible, easy to access RocksDB memory
reporting will be a 

[jira] [Reopened] (KAFKA-8098) Flaky Test AdminClientIntegrationTest#testConsumerGroups

2020-07-22 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reopened KAFKA-8098:
--

> Flaky Test AdminClientIntegrationTest#testConsumerGroups
> 
>
> Key: KAFKA-8098
> URL: https://issues.apache.org/jira/browse/KAFKA-8098
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3459/tests]
> {quote}java.lang.AssertionError: expected:<2> but was:<0>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:647)
> at org.junit.Assert.assertEquals(Assert.java:633)
> at 
> kafka.api.AdminClientIntegrationTest.testConsumerGroups(AdminClientIntegrationTest.scala:1194){quote}
> STDOUT
> {quote}2019-03-12 10:52:33,482] ERROR [ReplicaFetcher replicaId=2, 
> leaderId=1, fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:33,485] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:35,880] WARN Unable to read additional data from client 
> sessionid 0x104458575770003, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376)
> [2019-03-12 10:52:38,596] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition elect-preferred-leaders-topic-1-0 at offset 
> 0 (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:38,797] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition elect-preferred-leaders-topic-2-0 at offset 
> 0 (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:51,998] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:52,005] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,750] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition mytopic2-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,754] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition mytopic2-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,755] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition mytopic2-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition mytopic2-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,757] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition mytopic2-2 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,769] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition mytopic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,778]