[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767939#comment-16767939
 ] 

Murad M edited comment on KAFKA-7930 at 2/14/19 7:50 AM:
-

{quote}This should not have happened... Note that, all internal topic should 
not be read/written by any other application. If you want to share data, you 
should always create your own topics and write to them via `to()` or 
`through()`.
{quote}
That is the point, if there would be some internal flag that would identify 
that this topic is internal, then ok. But now it treats that topic as internal 
by pattern, hoping that it was generated correctly in advance, and user did not 
come up with arbitrary topic name that accidentally matches this pattern or 
changed the intention of previously name auto-generated topic.

On the other hand, currently if you specify such topic in `\-\-input-topics` or 
`--intermediate-topics`, then tool will first treat it as specified (apply the 
offset reset to latest/earliest) and then because it is also `isInternalTopic` 
will delete it. Which is somewhat weird.

Also, delete is fast and irreversible... Should not be that easy I think..
{quote}For this case, you could use `bin/kafka-consumer-group.sh` instead of 
stream reset tool. Thus, I am wondering if we should patch the reset tool at 
all, as there is an alternative tool you can use?
{quote}
Possible, and was considered, but:
 a) we had to read the code of both tools to make sure that they are doing same 
thing, because no documentation states that they are interchangeable.
 b) `kafka-consumer-group.sh` works on per topic basis, while 
`kafka-streams-application-reset.sh` works on multiple topics, which is more 
convenient.
 c) `kafka-streams-application-reset.sh` is intended tool for this, while 
`kafka-consumer-group.sh` seems like more "low level" and may work outside of 
context of streams.

Somehow, it was cheaper / faster patch `StreamsResetter` than use 
`kafka-consumer-group.sh`.


was (Author: muradm):
{quote}This should not have happened... Note that, all internal topic should 
not be read/written by any other application. If you want to share data, you 
should always create your own topics and write to them via `to()` or 
`through()`.
{quote}
That is the point, if there would be some internal flag that would identify 
that this topic is internal, then ok. But now it treats that topic as internal 
by pattern, hoping that it was generated correctly in advance, and user did not 
come up with arbitrary topic name that accidentally matches this pattern or 
changed the intention of previously name auto-generated topic.

On the other hand, currently if you specify such topic in `--input-topics` or 
`--intermediate-topics`, then tool will first treat it as specified (apply the 
offset reset to latest/earliest) and then because it is also `isInternalTopic` 
will delete it. Which is somewhat weird.

Also, delete is fast and irreversible... Should not be that easy I think..
{quote}For this case, you could use `bin/kafka-consumer-group.sh` instead of 
stream reset tool. Thus, I am wondering if we should patch the reset tool at 
all, as there is an alternative tool you can use?
{quote}
Possible, and was considered, but:
 a) we had to read the code of both tools to make sure that they are doing same 
thing, because no documentation states that they are interchangeable.
 b) `kafka-consumer-group.sh` works on per topic basis, while 
`kafka-streams-application-reset.sh` works on multiple topics, which is more 
convenient.
 c) `kafka-streams-application-reset.sh` is intended tool for this, while 
`kafka-consumer-group.sh` seems like more "low level" and may work outside of 
context of streams.

Somehow, it was cheaper / faster patch `StreamsResetter` than use 
`kafka-consumer-group.sh`.

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in 

[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767939#comment-16767939
 ] 

Murad M edited comment on KAFKA-7930 at 2/14/19 7:51 AM:
-

{quote}This should not have happened... Note that, all internal topic should 
not be read/written by any other application. If you want to share data, you 
should always create your own topics and write to them via `to()` or 
`through()`.
{quote}
That is the point, if there would be some internal flag that would identify 
that this topic is internal, then ok. But now it treats that topic as internal 
by pattern, hoping that it was generated correctly in advance, and user did not 
come up with arbitrary topic name that accidentally matches this pattern or 
changed the intention of previously name auto-generated topic.

On the other hand, currently if you specify such topic in `\-\-input-topics` or 
`--intermediate-topics`, then tool will first treat it as specified (apply the 
offset reset to latest/earliest) and then because it is also `isInternalTopic` 
will delete it. Which is somewhat weird and misleading.

Also, delete is fast and irreversible... Should not be that easy I think..
{quote}For this case, you could use `bin/kafka-consumer-group.sh` instead of 
stream reset tool. Thus, I am wondering if we should patch the reset tool at 
all, as there is an alternative tool you can use?
{quote}
Possible, and was considered, but:
 a) we had to read the code of both tools to make sure that they are doing same 
thing, because no documentation states that they are interchangeable.
 b) `kafka-consumer-group.sh` works on per topic basis, while 
`kafka-streams-application-reset.sh` works on multiple topics, which is more 
convenient.
 c) `kafka-streams-application-reset.sh` is intended tool for this, while 
`kafka-consumer-group.sh` seems like more "low level" and may work outside of 
context of streams.

Somehow, it was cheaper / faster patch `StreamsResetter` than use 
`kafka-consumer-group.sh`.


was (Author: muradm):
{quote}This should not have happened... Note that, all internal topic should 
not be read/written by any other application. If you want to share data, you 
should always create your own topics and write to them via `to()` or 
`through()`.
{quote}
That is the point, if there would be some internal flag that would identify 
that this topic is internal, then ok. But now it treats that topic as internal 
by pattern, hoping that it was generated correctly in advance, and user did not 
come up with arbitrary topic name that accidentally matches this pattern or 
changed the intention of previously name auto-generated topic.

On the other hand, currently if you specify such topic in `\-\-input-topics` or 
`--intermediate-topics`, then tool will first treat it as specified (apply the 
offset reset to latest/earliest) and then because it is also `isInternalTopic` 
will delete it. Which is somewhat weird.

Also, delete is fast and irreversible... Should not be that easy I think..
{quote}For this case, you could use `bin/kafka-consumer-group.sh` instead of 
stream reset tool. Thus, I am wondering if we should patch the reset tool at 
all, as there is an alternative tool you can use?
{quote}
Possible, and was considered, but:
 a) we had to read the code of both tools to make sure that they are doing same 
thing, because no documentation states that they are interchangeable.
 b) `kafka-consumer-group.sh` works on per topic basis, while 
`kafka-streams-application-reset.sh` works on multiple topics, which is more 
convenient.
 c) `kafka-streams-application-reset.sh` is intended tool for this, while 
`kafka-consumer-group.sh` seems like more "low level" and may work outside of 
context of streams.

Somehow, it was cheaper / faster patch `StreamsResetter` than use 
`kafka-consumer-group.sh`.

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which 

[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767939#comment-16767939
 ] 

Murad M edited comment on KAFKA-7930 at 2/14/19 7:50 AM:
-

{quote}This should not have happened... Note that, all internal topic should 
not be read/written by any other application. If you want to share data, you 
should always create your own topics and write to them via `to()` or 
`through()`.
{quote}
That is the point, if there would be some internal flag that would identify 
that this topic is internal, then ok. But now it treats that topic as internal 
by pattern, hoping that it was generated correctly in advance, and user did not 
come up with arbitrary topic name that accidentally matches this pattern or 
changed the intention of previously name auto-generated topic.

On the other hand, currently if you specify such topic in `--input-topics` or 
`--intermediate-topics`, then tool will first treat it as specified (apply the 
offset reset to latest/earliest) and then because it is also `isInternalTopic` 
will delete it. Which is somewhat weird.

Also, delete is fast and irreversible... Should not be that easy I think..
{quote}For this case, you could use `bin/kafka-consumer-group.sh` instead of 
stream reset tool. Thus, I am wondering if we should patch the reset tool at 
all, as there is an alternative tool you can use?
{quote}
Possible, and was considered, but:
 a) we had to read the code of both tools to make sure that they are doing same 
thing, because no documentation states that they are interchangeable.
 b) `kafka-consumer-group.sh` works on per topic basis, while 
`kafka-streams-application-reset.sh` works on multiple topics, which is more 
convenient.
 c) `kafka-streams-application-reset.sh` is intended tool for this, while 
`kafka-consumer-group.sh` seems like more "low level" and may work outside of 
context of streams.

Somehow, it was cheaper / faster patch `StreamsResetter` than use 
`kafka-consumer-group.sh`.


was (Author: muradm):
{quote}
This should not have happened... Note that, all internal topic should not be 
read/written by any other application. If you want to share data, you should 
always create your own topics and write to them via `to()` or `through()`.
{quote}

That is the point, if there would be some internal flag that would identify 
that this topic is internal, then ok. But now it treats that topic as internal 
by pattern, hoping that it was generated correctly in advance, and user did not 
come up with arbitrary topic name that accidentally matches this pattern or 
changed the intention of previously name auto-generated topic. 

On the other hand, currently if you specify such topic in `\-\-input-topics` or 
`--intermediate-topics`, then tool will first treat it as specified (apply the 
offset reset to latest/earliest) and then because it is also `isInternalTopic` 
will delete it. Which is somewhat weird.

Also, delete is fast and irreversible... Should not be that easy I think..

{quote}
For this case, you could use `bin/kafka-consumer-group.sh` instead of stream 
reset tool. Thus, I am wondering if we should patch the reset tool at all, as 
there is an alternative tool you can use?
{quote}

Possible, and was considered, but we had to:
a) read the code of both tools to make sure that they are doing same thing, 
because no documentation states that they are interchangeable.
b) `kafka-consumer-group.sh` works on per topic basis, while 
`kafka-streams-application-reset.sh` works on multiple topics, which is more 
convenient.
c) `kafka-streams-application-reset.sh` is intended tool for this, while 
`kafka-consumer-group.sh` seems like more "low level" and may work outside of 
context of streams.

Somehow, it was cheaper / faster patch `StreamsResetter` than use 
`kafka-consumer-group.sh`.

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in 

[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767939#comment-16767939
 ] 

Murad M edited comment on KAFKA-7930 at 2/14/19 7:49 AM:
-

{quote}
This should not have happened... Note that, all internal topic should not be 
read/written by any other application. If you want to share data, you should 
always create your own topics and write to them via `to()` or `through()`.
{quote}

That is the point, if there would be some internal flag that would identify 
that this topic is internal, then ok. But now it treats that topic as internal 
by pattern, hoping that it was generated correctly in advance, and user did not 
come up with arbitrary topic name that accidentally matches this pattern or 
changed the intention of previously name auto-generated topic. 

On the other hand, currently if you specify such topic in `\-\-input-topics` or 
`--intermediate-topics`, then tool will first treat it as specified (apply the 
offset reset to latest/earliest) and then because it is also `isInternalTopic` 
will delete it. Which is somewhat weird.

Also, delete is fast and irreversible... Should not be that easy I think..

{quote}
For this case, you could use `bin/kafka-consumer-group.sh` instead of stream 
reset tool. Thus, I am wondering if we should patch the reset tool at all, as 
there is an alternative tool you can use?
{quote}

Possible, and was considered, but we had to:
a) read the code of both tools to make sure that they are doing same thing, 
because no documentation states that they are interchangeable.
b) `kafka-consumer-group.sh` works on per topic basis, while 
`kafka-streams-application-reset.sh` works on multiple topics, which is more 
convenient.
c) `kafka-streams-application-reset.sh` is intended tool for this, while 
`kafka-consumer-group.sh` seems like more "low level" and may work outside of 
context of streams.

Somehow, it was cheaper / faster patch `StreamsResetter` than use 
`kafka-consumer-group.sh`.


was (Author: muradm):
{quote}
This should not have happened... Note that, all internal topic should not be 
read/written by any other application. If you want to share data, you should 
always create your own topics and write to them via `to()` or `through()`.
{quote}

That is the point, if there would be some internal flag that would identify 
that this topic is internal, then ok. But now it treats that topic as internal 
by pattern, hoping that it was generated correctly in advance, and user did not 
come up with arbitrary topic name that accidentally matches this pattern or 
changed the intention of previously name auto-generated topic. 

On the other hand, currently if you specify such topic in `--input-topics` or 
`--intermediate-topics`, then tool will first treat it as specified (apply the 
offset reset to latest/earliest) and then because it is also `isInternalTopic` 
will delete it. Which is somewhat weird.

Also, delete is fast and irreversible... Should not be that easy I think..

{quote}
For this case, you could use `bin/kafka-consumer-group.sh` instead of stream 
reset tool. Thus, I am wondering if we should patch the reset tool at all, as 
there is an alternative tool you can use?
{quote}

Possible, and was considered, but we had to:
a) read the code of both tools to make sure that they are doing same thing, 
because no documentation states that they are interchangeable.
b) `kafka-consumer-group.sh` works on per topic basis, while 
`kafka-streams-application-reset.sh` works on multiple topics, which is more 
convenient.
c) `kafka-streams-application-reset.sh` is intended tool for this, while 
`kafka-consumer-group.sh` seems like more "low level" and may work outside of 
context of streams.

Somehow, it was cheaper / faster patch `StreamsResetter` than use 
`kafka-consumer-group.sh`.

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included 

[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767939#comment-16767939
 ] 

Murad M commented on KAFKA-7930:


{quote}
This should not have happened... Note that, all internal topic should not be 
read/written by any other application. If you want to share data, you should 
always create your own topics and write to them via `to()` or `through()`.
{quote}

That is the point, if there would be some internal flag that would identify 
that this topic is internal, then ok. But now it treats that topic as internal 
by pattern, hoping that it was generated correctly in advance, and user did not 
come up with arbitrary topic name that accidentally matches this pattern or 
changed the intention of previously name auto-generated topic. 

On the other hand, currently if you specify such topic in `--input-topics` or 
`--intermediate-topics`, then tool will first treat it as specified (apply the 
offset reset to latest/earliest) and then because it is also `isInternalTopic` 
will delete it. Which is somewhat weird.

Also, delete is fast and irreversible... Should not be that easy I think..

{quote}
For this case, you could use `bin/kafka-consumer-group.sh` instead of stream 
reset tool. Thus, I am wondering if we should patch the reset tool at all, as 
there is an alternative tool you can use?
{quote}

Possible, and was considered, but we had to:
a) read the code of both tools to make sure that they are doing same thing, 
because no documentation states that they are interchangeable.
b) `kafka-consumer-group.sh` works on per topic basis, while 
`kafka-streams-application-reset.sh` works on multiple topics, which is more 
convenient.
c) `kafka-streams-application-reset.sh` is intended tool for this, while 
`kafka-consumer-group.sh` seems like more "low level" and may work outside of 
context of streams.

Somehow, it was cheaper / faster patch `StreamsResetter` than use 
`kafka-consumer-group.sh`.

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767936#comment-16767936
 ] 

Matthias J. Sax commented on KAFKA-7930:


{quote}Now they are being used for different purposes like cached lookup, which 
are populated from various other places. And they effectively became "input" 
topics represented as GlobalKTable.
{quote}
This should not have happened... Note that, all internal topic should not be 
read/written by any other application. If you want to share data, you should 
always create your own topics and write to them via `to()` or `through()`.
{quote}For application reset it is enough to shift offsets to earliest, 
{quote}
 

For this case, you could use `bin/kafka-consumer-group.sh` instead of stream 
reset tool. Thus, I am wondering if we should patch the reset tool at all, as 
there is an alternative tool you can use?

 

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767930#comment-16767930
 ] 

Matthias J. Sax commented on KAFKA-6460:


I did not think about how to make it work. But what you suggests SGTM – in 
fact, `TopologyTestDriver` uses `InternalTopologyBuilder` already.

Overall (from experience) this seem to be a tricky ticket – thus, I would like 
to have a PR in parallel to the KIP – otherwise, it will be hard to get right.

[~shung] How does this sound to you?

 

> Add mocks for state stores used in Streams unit testing
> ---
>
> Key: KAFKA-6460
> URL: https://issues.apache.org/jira/browse/KAFKA-6460
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> We'd like to use mocks for different types of state stores: kv, window, 
> session that can be used to record the number of expected put / get calls 
> used in the DSL operator unit testing. This involves implementing the two 
> interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object 
> created from, say, EasyMock, and the object can then be set up with the 
> expected calls.
> In addition, we should also add a mock record collector which can be returned 
> from the mock processor context so that with logging enabled store, users can 
> also validate if the changes have been forwarded to the changelog as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767905#comment-16767905
 ] 

ASF GitHub Bot commented on KAFKA-7930:
---

muradm commented on pull request #6267: KAFKA-7930: topic is not internal if 
explicitly listed in args
URL: https://github.com/apache/kafka/pull/6267
 
 
   Simplest fix: topic is not internal if explicitly listed in --input-topics 
or --intermediate-topics.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767751#comment-16767751
 ] 

Murad M edited comment on KAFKA-7930 at 2/14/19 6:36 AM:
-

Patch provided: https://github.com/apache/kafka/pull/6267


was (Author: muradm):
Patch provided: https://github.com/muradm/kafka/pull/1

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-02-13 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767899#comment-16767899
 ] 

Guozhang Wang commented on KAFKA-6460:
--

Ah yes, that's a good point. 

Thinking about this again, how about injecting the mocking logic into 
{{TopologyTestDriver}} so that users do not need to make any code changes, 
while the cost is to push the code complexity burden on Streams (i.e. inside 
{{TopologyTestDriver}}). More specifically: 

When {{TopologyTestDriver}} takes in the {{InternalTopologyBuilder}} as 
parameter, loop over its {{stateFactories}} and {{globalStateBuilders}} map, 
and replace each entry (StateStoreFactory's embedded {{StoreBuilder}}, and 
{{StoreBuilder}} respectively) with the mock store builder by checking its 
type, e.g. {{KeyValueStoreBuilder}} -> {{MockKeyValueStoreBuilder}}, etc.

So that for both PAPI and DSL, user code does not need to change at all:

1) PAPI users in production / testing code: 
{{Topology#addStateStore(Stores.keyValueStoreBuilder(...))}}.
2) DSL users in production code, without materialization spec: {{aggregate()}}
2) DSL users in production code, with materialization spec: 
{{aggregate(Materialized)}}

Then the only additional API we need is to allow users to check the number of 
function calls for a given store with 
{{TopologyTestDriver#putEntries(storeName)}}. WDYT?

> Add mocks for state stores used in Streams unit testing
> ---
>
> Key: KAFKA-6460
> URL: https://issues.apache.org/jira/browse/KAFKA-6460
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> We'd like to use mocks for different types of state stores: kv, window, 
> session that can be used to record the number of expected put / get calls 
> used in the DSL operator unit testing. This involves implementing the two 
> interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object 
> created from, say, EasyMock, and the object can then be set up with the 
> expected calls.
> In addition, we should also add a mock record collector which can be returned 
> from the mock processor context so that with logging enabled store, users can 
> also validate if the changes have been forwarded to the changelog as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767895#comment-16767895
 ] 

Murad M commented on KAFKA-7930:


Yes, if you create GlobalKTable, it is created from provided topic name. How we 
come up with `--changelog` I don't remember. That was years back 
since 0.8.x-0.9.x times. Probably it was KTable's generated topic.

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767890#comment-16767890
 ] 

Guozhang Wang commented on KAFKA-7930:
--

{{GlobalKTable}} should not have changelog topics since it always use the 
defined source topic as changelog to materialize itself, right?

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767880#comment-16767880
 ] 

Murad M edited comment on KAFKA-7930 at 2/14/19 5:14 AM:
-

Historically those topics was auto-created as per GlobalKTable naming pattern. 
Now they are being used for different purposes like cached lookup, which are 
populated from various other places. And they effectively became "input" topics 
represented as GlobalKTable. While there are mechanisms to re-populate them, it 
does not necessarily mean that it has to be done every time application being 
reset. For application reset it is enough to shift offsets to earliest, so that 
GlobalKTable will materialize again from existing topic. It is true that, from 
replay point of view, we would achieve different results, but same is true for 
any other topic with 'cleanup.policy=compact,delete'. Some use-cases:
- static / managed configuration data (routes, flags etc.)
- caches pulled from external systems
- caches based on pushed from external systems
- the other case is sequence of services, where first service builds state 
represented by GlobalKTable from event stream, and number of other applications 
leveraging from built state. So for first application, it is "internal" 
deleteable topic, while for rest applications it is "input" topic, but attempt 
to reset any of services using that topic as "input", ends in loosing topic, so 
all services in sequence has to be reset.

Once we hit that limitation, first option was to get rid of historical naming 
convention, but that is not an option either, as there is no such thing as 
"renaming" topics. It is possible to "copy" data with tool like mirror-maker, 
but that is whole different story. 


was (Author: muradm):
Historically those topics was auto-created as per GlobalKTable naming pattern. 
Now they are being used for different purposes like cached lookup, which are 
populated from various other places. And they effectively became "input" topics 
represented as GlobalKTable. While there are mechanisms to re-populate them, it 
does not necessarily mean that it has to be done every time application being 
reset. For application reset it is enough to shift offsets to earliest, so that 
GlobalKTable will materialize again from existing topic. It is true that, from 
replay point of view, we would achieve different results, but same is true for 
any other topic with 'cleanup.policy=compact,delete'. Some use-cases:
- static / managed configuration data (routes, flags etc.)
- caches pulled from external systems
- caches based on pushed from external systems
- the other case is sequence of services, where first service builds state 
represented by GlobalKTable from event stream, and number of other applications 
leveraging from built state. So for first application, it is "internal" 
deleteable topic, while for rest applications it is "input" topic, but attempt 
to reset any of services using that topic as "input", ends in loosing topic, so 
all services in sequence has to be reset.
Once we hit that limitation, first option was to get rid of historical naming 
convention, but that is not an option either, as there is no such thing as 
"renaming" topics. It is possible to "copy" data with tool like mirror-maker, 
but that is whole different story. 

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767880#comment-16767880
 ] 

Murad M edited comment on KAFKA-7930 at 2/14/19 5:14 AM:
-

Historically those topics was auto-created as per GlobalKTable naming pattern. 
Now they are being used for different purposes like cached lookup, which are 
populated from various other places. And they effectively became "input" topics 
represented as GlobalKTable. While there are mechanisms to re-populate them, it 
does not necessarily mean that it has to be done every time application being 
reset. For application reset it is enough to shift offsets to earliest, so that 
GlobalKTable will materialize again from existing topic. It is true that, from 
replay point of view, we would achieve different results, but same is true for 
any other topic with 'cleanup.policy=compact,delete'. Some use-cases:
- static / managed configuration data (routes, flags etc.)
- caches pulled from external systems
- caches based on pushed from external systems
- the other case is sequence of services, where first service builds state 
represented by GlobalKTable from event stream, and number of other applications 
leveraging from built state. So for first application, it is "internal" 
deleteable topic, while for rest applications it is "input" topic, but attempt 
to reset any of services using that topic as "input", ends in loosing topic, so 
all services in sequence has to be reset.
Once we hit that limitation, first option was to get rid of historical naming 
convention, but that is not an option either, as there is no such thing as 
"renaming" topics. It is possible to "copy" data with tool like mirror-maker, 
but that is whole different story. 


was (Author: muradm):
Historically those topics was auto-created as per GlobalKTable naming pattern. 
Now they are being used for different purposes like cached lookup, which are 
populated from various other places. And they effectively became "input" topics 
represented as GlobalKTable. While there are mechanisms to re-populate them, it 
does not necessarily mean that it has to be done every time application being 
reset. For application reset it is enough to shift offsets to earliest, so that 
GlobalKTable will materialize again from existing topic. It is true that, from 
replay point of view, we would achieve different results, but same is true for 
any other topic with 'cleanup.policy=compact,delete'. Some use-cases:
- static / managed configuration data (routes, flags etc.)
- caches pulled from external systems
- caches based on pushed from external systems

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767880#comment-16767880
 ] 

Murad M commented on KAFKA-7930:


Historically those topics was auto-created as per GlobalKTable naming pattern. 
Now they are being used for different purposes like cached lookup, which are 
populated from various other places. And they effectively became "input" topics 
represented as GlobalKTable. While there are mechanisms to re-populate them, it 
does not necessarily mean that it has to be done every time application being 
reset. For application reset it is enough to shift offsets to earliest, so that 
GlobalKTable will materialize again from existing topic. It is true that, from 
replay point of view, we would achieve different results, but same is true for 
any other topic with 'cleanup.policy=compact,delete'. Some use-cases:
- static / managed configuration data (routes, flags etc.)
- caches pulled from external systems
- caches based on pushed from external systems

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-13 Thread huxihx (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767814#comment-16767814
 ] 

huxihx commented on KAFKA-7794:
---

There are some inconsistencies here where GetOffsetShell wants to seek the 
largest offset before the given timestamp, but ListOffsetRequest retrieves the 
smallest offset after the timestamp (See comments of method 
`fetchOffsetByTimestamp` in Log.scala).  That's why you'll get all the log 
start offsets  if specifying a very old timestamp, but you get nothing when a 
future timestamp is given.

A naive solution is to correct the description of `–time` for GetOffsetShell :)

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, 
> image-2019-02-13-11-43-28-873.png, image-2019-02-13-11-44-18-736.png, 
> image-2019-02-13-11-45-21-459.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767791#comment-16767791
 ] 

Matthias J. Sax commented on KAFKA-6460:


Internally, the DSL always uses the class `Stores` to create state store. Thus, 
if one write code like
{quote}{{stream.groupByKey().aggregate(...)}}
{quote}
and want to test the aggregator by inspecting the store, one want to inject a 
mocked store. However, to do this, the code must be re-written to
{quote}{{stream.groupByKey().aggregate(..., 
Materialized.as(MockStoreFactory.mockedKeyValueStore(...));}}{quote}
This implies that production code and test code is not the same. For proper 
testing, it should be possible to test the original code directly without 
rewriting the code.

The current `TopologyTestDriver` also works that way – it takes an unmodified 
`Topology` that can either be handed to `KafkaStream` to actually run it, or to 
the driver to test it. It's not necessary to rewrite the code that assembles 
the `Topology` in order to test it. For mocked stores, it should work the same 
way. Does it make sense?

> Add mocks for state stores used in Streams unit testing
> ---
>
> Key: KAFKA-6460
> URL: https://issues.apache.org/jira/browse/KAFKA-6460
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> We'd like to use mocks for different types of state stores: kv, window, 
> session that can be used to record the number of expected put / get calls 
> used in the DSL operator unit testing. This involves implementing the two 
> interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object 
> created from, say, EasyMock, and the object can then be set up with the 
> expected calls.
> In addition, we should also add a mock record collector which can be returned 
> from the mock processor context so that with logging enabled store, users can 
> also validate if the changes have been forwarded to the changelog as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-02-13 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767766#comment-16767766
 ] 

Guozhang Wang commented on KAFKA-6460:
--

[~mjsax] I'm not sure I can follow-up here, could you elaborate a bit more?

> Add mocks for state stores used in Streams unit testing
> ---
>
> Key: KAFKA-6460
> URL: https://issues.apache.org/jira/browse/KAFKA-6460
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> We'd like to use mocks for different types of state stores: kv, window, 
> session that can be used to record the number of expected put / get calls 
> used in the DSL operator unit testing. This involves implementing the two 
> interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object 
> created from, say, EasyMock, and the object can then be set up with the 
> expected calls.
> In addition, we should also add a mock record collector which can be returned 
> from the mock processor context so that with logging enabled store, users can 
> also validate if the changes have been forwarded to the changelog as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7931) Java Client: if all ephemeral brokers fail, client can never reconnect to brokers

2019-02-13 Thread Brian (JIRA)
Brian created KAFKA-7931:


 Summary: Java Client: if all ephemeral brokers fail, client can 
never reconnect to brokers
 Key: KAFKA-7931
 URL: https://issues.apache.org/jira/browse/KAFKA-7931
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.1.0
Reporter: Brian


Steps to reproduce:
 * Setup kafka cluster in GKE, with bootstrap server address configured to 
point to a load balancer that exposes all GKE nodes
 * Run producer that emits values into a partition with 3 replicas
 * Kill every broker in the cluster
 * Wait for brokers to restart

Observed result:

The java client cannot find any of the nodes even though they have all 
recovered. I see messages like "Connection to node 30 (/10.6.0.101:9092) could 
not be established. Broker may not be available.".

Note, this is *not* a duplicate of 
https://issues.apache.org/jira/browse/KAFKA-7890. I'm using the client version 
that contains the fix for https://issues.apache.org/jira/browse/KAFKA-7890.

Versions:

Kakfa: kafka version 2.1.0, using confluentinc/cp-kafka/5.1.0 docker image

Client: trunk from a few days ago (git sha 
9f7e6b291309286e3e3c1610e98d978773c9d504), to pull in the fix for KAFKA-7890

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-02-13 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767479#comment-16767479
 ] 

Guozhang Wang edited comment on KAFKA-6460 at 2/14/19 1:21 AM:
---

Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update 
the description of the ticket as well).

The main goal for this test utils is similar to KIP-267, i.e. to provide users 
during their development cycles a smooth iterations of trial-and-error 
experience, where they do not necessarily need to bring up a full fledged 
rocksDB instance, for example. So the scope of this task would be:

1) provide a mock store interface that can be used for both 
{{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the 
{{Materialized#as(StoreSupplier>)}} (the DSL 
layer). 

For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} 
inside {{streams/test-utils}} artifact, 

that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, 
and {{SessionStoreBuilder}} (for PAPI) as well as a 
{{StoreSupplier>}} (for DSL) provided by a store 
name, whose {{build}} and {{get}} calls will return a mock (this mock 
implementation only need to be the inner-most store, i.e. it can still be 
wrapped with metered / caching / logging, and it should be inside 
{{org.apache.kafka.streams.test.internals}} package so that they are not part 
of the public APIs).

2) The mock store implementations should expect it's {{init}} function to be 
called with a {{MockProcessorContext}} which includes recording all records 
forwarded via this context (e.g. changelogs). So the mock store implementation 
itself only need to keep track of its store function calls.

3) The mock {{StoreBuilder}} or {{StoreSupplier}}'s {{build}} / {{get}} calls 
are supposed to be only called once since they are expected to be used with 
{{TopologyTestDriver}} which does not create multiple topologies. Then users 
can get access the mocked store instance via {{TopologyTestDriver#getXXStore}} 
and then use their public APIs to query the store.

4) Additional to allow users to query the store directly, user's may want to 
also get how many function calls are triggered --  e.g. maybe the current store 
returns `2` for key `k`, but we also want to make sure it was because `put(k, 
1)` and `put(k, 2)` are called. This can be provided by a public API like 
{{MockStoreFactory#putEntries(storeName)}} that returns a list of entries that 
are called via {{put}}.

5) For Streams' own unit tests, we can then refactor them to use this new mock 
store factory. For example, we can remove the internal 
{{org.apache.kafka.streams.state.KeyValueStoreTestDriver}} and use the above to 
refactor any unit tests related to this class -- one logic that is not yet 
supported in {{TopologyTestDriver}} / {{MockStoreFactory}} is store 
restoration, i.e. streams library may wan to pipe-in some records to the 
corresponding changelog first before starting the test driver, which will then 
be used to bootstrap the (possibly mocked) stores. This is not of interest to 
users, but streams' own unit testing need to cover.

EDIT: regarding 5), one thing I realized that restoration logic should be 
considered to be tested in Streams' own unit testing, but not really user's 
unit testing focuses. For users own testing, if they want to pre-populate some 
stores they can do that today by first getting the stores from the 
{{TopologyTestDriver}}, and then use {{put}} calls to insert some data into the 
stores first, and then call {{driver.pipeInput}} to pipe in some data into 
source topics. So maybe we would not need to replace, for example 
{{KeyValueStoreTestDriver}} with {{MockStateFactory}}, but I still think we can 
get rid of this class with the {{MockProcessorContext}} who has functionalities 
to keep track of forward calls etc.



was (Author: guozhang):
Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update 
the description of the ticket as well).

The main goal for this test utils is similar to KIP-267, i.e. to provide users 
during their development cycles a smooth iterations of trial-and-error 
experience, where they do not necessarily need to bring up a full fledged 
rocksDB instance, for example. So the scope of this task would be:

1) provide a mock store interface that can be used for both 
{{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the 
{{Materialized#as(StoreSupplier>)}} (the DSL 
layer). 

For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} 
inside {{streams/test-utils}} artifact, 

that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, 
and {{SessionStoreBuilder}} (for PAPI) as well as a 
{{StoreSupplier>}} (for DSL) provided by a store 
name, whose {{build}} and {{get}} calls will return a mock (this mock 
implementation only need to 

[jira] [Comment Edited] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-02-13 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767479#comment-16767479
 ] 

Guozhang Wang edited comment on KAFKA-6460 at 2/14/19 1:16 AM:
---

Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update 
the description of the ticket as well).

The main goal for this test utils is similar to KIP-267, i.e. to provide users 
during their development cycles a smooth iterations of trial-and-error 
experience, where they do not necessarily need to bring up a full fledged 
rocksDB instance, for example. So the scope of this task would be:

1) provide a mock store interface that can be used for both 
{{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the 
{{Materialized#as(StoreSupplier>)}} (the DSL 
layer). 

For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} 
inside {{streams/test-utils}} artifact, 

that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, 
and {{SessionStoreBuilder}} (for PAPI) as well as a 
{{StoreSupplier>}} (for DSL) provided by a store 
name, whose {{build}} and {{get}} calls will return a mock (this mock 
implementation only need to be the inner-most store, i.e. it can still be 
wrapped with metered / caching / logging, and it should be inside 
{{org.apache.kafka.streams.test.internals}} package so that they are not part 
of the public APIs).

2) The mock store implementations should expect it's {{init}} function to be 
called with a {{MockProcessorContext}} which includes recording all records 
forwarded via this context (e.g. changelogs). So the mock store implementation 
itself only need to keep track of its store function calls.

3) The mock {{StoreBuilder}} or {{StoreSupplier}}'s {{build}} / {{get}} calls 
are supposed to be only called once since they are expected to be used with 
{{TopologyTestDriver}} which does not create multiple topologies. Then users 
can get access the mocked store instance via {{TopologyTestDriver#getXXStore}} 
and then use their public APIs to query the store.

4) Additional to allow users to query the store directly, user's may want to 
also get how many function calls are triggered --  e.g. maybe the current store 
returns `2` for key `k`, but we also want to make sure it was because `put(k, 
1)` and `put(k, 2)` are called. This can be provided by a public API like 
{{MockStoreFactory#putEntries(storeName)}} that returns a list of entries that 
are called via {{put}}.

5) For Streams' own unit tests, we can then refactor them to use this new mock 
store factory. For example, we can remove the internal 
{{org.apache.kafka.streams.state.KeyValueStoreTestDriver}} and use the above to 
refactor any unit tests related to this class -- one logic that is not yet 
supported in {{TopologyTestDriver}} / {{MockStoreFactory}} is store 
restoration, i.e. streams library may wan to pipe-in some records to the 
corresponding changelog first before starting the test driver, which will then 
be used to bootstrap the (possibly mocked) stores. This is not of interest to 
users, but streams' own unit testing need to cover.



was (Author: guozhang):
Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update 
the description of the ticket as well).

The main goal for this test utils is similar to KIP-267, i.e. to provide users 
during their development cycles a smooth iterations of trial-and-error 
experience, where they do not necessarily need to bring up a full fledged 
rocksDB instance, for example. So the scope of this task would be:

1) provide a mock store interface that can be used for both 
{{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the 
{{Materialized#as(StoreSupplier>)}} (the DSL 
layer). 

For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} 
inside {{streams/test-utils}} artifact, 

that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, 
and {{SessionStoreBuilder}} (for PAPI) as well as a 
{{StoreSupplier>}} (for DSL) provided by a store 
name, whose {{build}} and {{get}} calls will return a mock (this mock 
implementation only need to be the inner-most store, i.e. it can still be 
wrapped with metered / caching / logging, and it should be inside 
{{org.apache.kafka.streams.test.internals}} package so that they are not part 
of the public APIs).

2) The mock store implementations should expect it's {{init}} function to be 
called with a {{MockProcessorContext}} which includes recording all records 
forwarded via this context (e.g. changelogs). So the mock store implementation 
itself only need to keep track of its store function calls.

3) The mock {{StoreBuilder}} or {{StoreSupplier}}'s {{build} / {{get}} calls 
are supposed to be only called once since they are expected to be used with 
{{TopologyTestDriver}} which does not create multiple 

[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767765#comment-16767765
 ] 

Matthias J. Sax commented on KAFKA-7918:


Judgment call in the end how much you put together into one PR – PR with more 
then 500 LOC getting hard to review and the turn around time increases.

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767760#comment-16767760
 ] 

Matthias J. Sax commented on KAFKA-7930:


Thanks for creating this ticket. I agree that naming assumptions could be 
optional. However, why would you want to make topic deletion optional? That 
whole purpose of the tool is to delete internal topics. For the last point: I 
think it kinda makes sense, however why would you have an input topic with name 
patter `-someName-repartition` or 
`-someName-changelog` ?
 
{quote}Faced this, when was trying to reset applications with GlobalKTable 
topics named as *-changelog. Such topics sometimes are not desirable for 
deletion.
{quote}
If it's does not have `-` prefix it won't be deleted. Can you 
elaborate?

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7930:
---
Labels: features needs-kip patch-available usability  (was: features 
patch-available usability)

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, needs-kip, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7928) Deprecate WindowStore.put(key, value)

2019-02-13 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7928:
---
Labels: beginner easy-fix needs-kip newbie  (was: needs-kip)

> Deprecate WindowStore.put(key, value)
> -
>
> Key: KAFKA-7928
> URL: https://issues.apache.org/jira/browse/KAFKA-7928
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Priority: Major
>  Labels: beginner, easy-fix, needs-kip, newbie
>
> Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)`
> This method is strange... A window store needs to have a timestamp associated 
> with the key, so if you do a put without a timestamp, it's up to the store to 
> just make one up.
> Even the javadoc on the method recommends not to use it, due to this 
> confusing behavior.
> We should just deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7928) Deprecate WindowStore.put(key, value)

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767754#comment-16767754
 ] 

Matthias J. Sax commented on KAFKA-7928:


+1

> Deprecate WindowStore.put(key, value)
> -
>
> Key: KAFKA-7928
> URL: https://issues.apache.org/jira/browse/KAFKA-7928
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)`
> This method is strange... A window store needs to have a timestamp associated 
> with the key, so if you do a put without a timestamp, it's up to the store to 
> just make one up.
> Even the javadoc on the method recommends not to use it, due to this 
> confusing behavior.
> We should just deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7928) Deprecate WindowStore.put(key, value)

2019-02-13 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7928:
---
Labels: needs-kip  (was: )

> Deprecate WindowStore.put(key, value)
> -
>
> Key: KAFKA-7928
> URL: https://issues.apache.org/jira/browse/KAFKA-7928
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)`
> This method is strange... A window store needs to have a timestamp associated 
> with the key, so if you do a put without a timestamp, it's up to the store to 
> just make one up.
> Even the javadoc on the method recommends not to use it, due to this 
> confusing behavior.
> We should just deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Murad M updated KAFKA-7930:
---
Labels: features patch-available usability  (was: features usability)

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, patch-available, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767751#comment-16767751
 ] 

Murad M commented on KAFKA-7930:


Patch provided: https://github.com/muradm/kafka/pull/1

> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Murad M updated KAFKA-7930:
---
Description: 
StreamsResetter deletes the topics considered internal. Currently it just 
checks the naming as per 
[code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
 If assumption is wrong (either topic prefix or suffix), tool becomes useless 
if aware even dangerous if not. Probably better either:
 * naming assumption should be optional and supply internal topics with 
argument (--internal-topics)
 * deletion could be optional (--no-delete-internal)
 * ignore topics which are included in list of --input-topics

Faced this, when was trying to reset applications with GlobalKTable topics 
named as *-changelog. Such topics sometimes are not desirable for deletion.

  was:
StreamsResetter deletes the topics considered internal. Currently it just 
checks the naming as per 
[code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
 If assumption is wrong (either topic prefix or suffix), tool becomes useless 
if aware even dangerous if not. Probably better either:
 * naming assumption should be optional and supply internal topics with 
argument (--internal-topics)
 * deletion could be optional (--no-delete-internal)

Faced this, when was trying to reset applications with GlobalKTable topics 
named as *-changelog. Such topics sometimes are not desirable for deletion.


> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
>  * ignore topics which are included in list of --input-topics
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Murad M updated KAFKA-7930:
---
Description: 
StreamsResetter deletes the topics considered internal. Currently it just 
checks the naming as per 
[code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
 If assumption is wrong (either topic prefix or suffix), tool becomes useless 
if aware even dangerous if not. Probably better either:
 * naming assumption should be optional and supply internal topics with 
argument (--internal-topics)
 * deletion could be optional (--no-delete-internal)

Faced this, when was trying to reset applications with GlobalKTable topics 
named as *-changelog. Such topics sometimes are not desirable for deletion.

  was:
StreamsResetter deletes the topics considered internal. Currently it just 
checks the naming as per 
[code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
 If assumption is wrong (either topic prefix or suffix), tool becomes useless 
if aware even dangerous if not. Probably better either:
 * naming assumption should be optional and supply internal topics with 
argument (--internal-topics)
 * deletion could be optional (--no-delete-internal)


> StreamsResetter makes "changelog" topic naming assumptions
> --
>
> Key: KAFKA-7930
> URL: https://issues.apache.org/jira/browse/KAFKA-7930
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Affects Versions: 2.1.0
>Reporter: Murad M
>Priority: Major
>  Labels: features, usability
>
> StreamsResetter deletes the topics considered internal. Currently it just 
> checks the naming as per 
> [code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
>  If assumption is wrong (either topic prefix or suffix), tool becomes useless 
> if aware even dangerous if not. Probably better either:
>  * naming assumption should be optional and supply internal topics with 
> argument (--internal-topics)
>  * deletion could be optional (--no-delete-internal)
> Faced this, when was trying to reset applications with GlobalKTable topics 
> named as *-changelog. Such topics sometimes are not desirable for deletion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7930) StreamsResetter makes "changelog" topic naming assumptions

2019-02-13 Thread Murad M (JIRA)
Murad M created KAFKA-7930:
--

 Summary: StreamsResetter makes "changelog" topic naming assumptions
 Key: KAFKA-7930
 URL: https://issues.apache.org/jira/browse/KAFKA-7930
 Project: Kafka
  Issue Type: Improvement
  Components: streams, tools
Affects Versions: 2.1.0
Reporter: Murad M


StreamsResetter deletes the topics considered internal. Currently it just 
checks the naming as per 
[code|https://github.com/apache/kafka/blob/1aae604861068bb7337d4972c9dcc0c0a99c374d/core/src/main/scala/kafka/tools/StreamsResetter.java#L660].
 If assumption is wrong (either topic prefix or suffix), tool becomes useless 
if aware even dangerous if not. Probably better either:
 * naming assumption should be optional and supply internal topics with 
argument (--internal-topics)
 * deletion could be optional (--no-delete-internal)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7929) RocksDB Window Store allows segments to expire out from under iterator

2019-02-13 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-7929:
--

 Summary: RocksDB Window Store allows segments to expire out from 
under iterator
 Key: KAFKA-7929
 URL: https://issues.apache.org/jira/browse/KAFKA-7929
 Project: Kafka
  Issue Type: Bug
Reporter: Sophie Blee-Goldman


While we provide no guarantees about returning a snapshot when fetching from 
persistent window stores, we should at least not allow old segments to expire 
while an iterator over them remains open. This can result in unexpected 
behavior as the number of records returned depends on how quickly the results 
are read from an iterator, and you might even end up reading records with a gap 
in the middle.

 

For example, you might fetch records between t1 and t3, then immediately read 
the first record (t1) and do some processing. If enough time advances by the 
time you read the second record from the iterator, record t2 may have expired, 
so the next you would read is t3. Therefore you conclude there were records at 
t1 and t3 but nothing at t2, which is incorrect. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-13 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767663#comment-16767663
 ] 

Sophie Blee-Goldman commented on KAFKA-7918:


Makes sense, I will consider the caching layer to be outside of the immediate 
scope for now. Does it make sense to break up the PR into one for each type of 
store (ie kv, window, session)? The implementation changes will be rather minor 
but the testing framework requires some overhaul since  is 
hardcoded into all the unit tests... (still relatively minor, but certainly 
many many lines of code)

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7928) Deprecate WindowStore.put(key, value)

2019-02-13 Thread John Roesler (JIRA)
John Roesler created KAFKA-7928:
---

 Summary: Deprecate WindowStore.put(key, value)
 Key: KAFKA-7928
 URL: https://issues.apache.org/jira/browse/KAFKA-7928
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)`

This method is strange... A window store needs to have a timestamp associated 
with the key, so if you do a put without a timestamp, it's up to the store to 
just make one up.

Even the javadoc on the method recommends not to use it, due to this confusing 
behavior.

We should just deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767623#comment-16767623
 ] 

Matthias J. Sax commented on KAFKA-7918:


In general I think yes. However, it implies a little bigger refactoring. Atm, 
the caching-layer is responsible to execute the callback on eviction that we 
effectively use to forward messaged to downstream processor (that's why the 
caching layer deserializes the bytes, because downstream processors expect 
objects, not bytes – the callback is defined on types, not bytes).

However, it seems, that the current behavior is not a nice separation of 
concerns, because we put bytes into the caching store, and thus it seems to be 
reasonable to get bytes back on the caching callback (what we would need to do 
to refactor the stores and get rid of the generics). This implies, that the 
metered store would need to take a custom callback that expects object, and 
wraps it with a byte based callback that is registered on the caching store. In 
the wrapper, the deserialization must happen now to not break public API (ie, 
we move deserialization from caching up to metered store for the callbacks). 
Does this make sense? If you want to tackle this, feel free to do.

In any case, it's a lot of refactoring and thus, please so multiple smaller PRs 
instead of one gigantic PR to simplify reviewing :) Thanks.

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-13 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767611#comment-16767611
 ] 

Sophie Blee-Goldman commented on KAFKA-7918:


In light of the decision to close KAFKA-7917 should we include the three 
CachingXXStore layers in this as well?

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7758) When Naming a Repartition Topic with Aggregations Reuse Repartition Graph Node for Multiple Operations

2019-02-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767603#comment-16767603
 ] 

ASF GitHub Bot commented on KAFKA-7758:
---

bbejeck commented on pull request #6265: KAFKA-7758: Reuse 
KGroupedStream/KGroupedTable with named repartition topics
URL: https://github.com/apache/kafka/pull/6265
 
 
   This PR adds support for re-using a `KGroupedStream` or `KGroupedTable 
object after executing an aggregation operation with a named repartition topic. 
   
   `KGroupedStream` example
   ```java
   final KGroupedStream kGroupedStream = builder.stream("topic").selectKey((k, v) -> k).groupByKey(as("grouping"));
   
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one");
   
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
   ```
   `KGroupedTable` example
   ```java
   final KGroupedTable kGroupedTable = builder.table("topic").groupBy(KeyValue::pair, Grouped.as("grouping"));
kGroupedTable.count().toStream().to("output-count");
kGroupedTable.reduce((v, v2) -> v2, (v, v2) -> 
v2).toStream().to("output-reduce");
   ```
   This approach will not cause any compatibility issues for two reasons
   
   1. Aggregations requiring repartitioning without naming the repartition 
topic maintain the same topology structure, which is the default mode today.  
So by not reusing the repartition graph node, the numbering and structure of 
the topology remains the same.
   2. Aggregations where the repartition topic _*is*_ named, it is not possible 
at the moment to re-use either the `KGroupedStream` or `KGroupedTable` object 
as Kafka Streams throws an `InvalidTopologyException` when building the 
topology. Hence you can't even deploy the application.
   
   I've added unit tests for each case and ran our existing suite of streams 
tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> When Naming a Repartition Topic with Aggregations Reuse Repartition Graph 
> Node for Multiple Operations
> --
>
> Key: KAFKA-7758
> URL: https://issues.apache.org/jira/browse/KAFKA-7758
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.2.0
>
>
> When performing aggregations that require repartitioning and the repartition 
> topic name is specified, and using the resulting {{KGroupedStream}} for 
> multiple operations i.e.
>  
> {code:java}
> final KGroupedStream kGroupedStream = builder. String>stream("topic").selectKey((k, v) -> 
> k).groupByKey(Grouped.as("grouping"));
> kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
> kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
> {code}
> If optimizations aren't enabled, Streams will attempt to build two 
> repartition topics of the same name resulting in a failure creating the 
> topology.  
>  
> However, we have enough information to re-use the existing repartition node 
> via graph nodes used for building the intermediate representation of the 
> topology. This ticket will make the 
> behavior of reusing a {{KGroupedStream}} consistent regardless if 
> optimizations are turned on or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-13 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767580#comment-16767580
 ] 

Guozhang Wang commented on KAFKA-7918:
--

+1. [~ableegoldman] Thanks for looking into it.

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-13 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler closed KAFKA-7917.
---

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-13 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7917.
-
Resolution: Won't Fix

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-13 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767574#comment-16767574
 ] 

John Roesler commented on KAFKA-7917:
-

After messing with the code for a while, I'm not happy with the trade-offs.

Some parts combine cleanly, but others are tightly coupled with different 
components of Streams, and smashing them all together creates more of a mess.

I'm going to close this ticket, and we'll just take this one step at a time by 
seeing how the codebase looks after 
https://issues.apache.org/jira/browse/KAFKA-7918

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7921) Instable KafkaStreamsTest

2019-02-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767548#comment-16767548
 ] 

ASF GitHub Bot commented on KAFKA-7921:
---

guozhangwang commented on pull request #6262: KAFKA-7921: log at error level 
for missing source topic
URL: https://github.com/apache/kafka/pull/6262
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Instable KafkaStreamsTest
> -
>
> Key: KAFKA-7921
> URL: https://issues.apache.org/jira/browse/KAFKA-7921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>
> {{KafkaStreamsTest}} failed multiple times, eg,
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.shouldThrowOnCleanupWhileRunning(KafkaStreamsTest.java:556){quote}
> or
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:255){quote}
>  
> The preserved logs are as follows:
> {quote}[2019-02-12 07:02:17,198] INFO Kafka version: 2.3.0-SNAPSHOT 
> (org.apache.kafka.common.utils.AppInfoParser:109)
> [2019-02-12 07:02:17,198] INFO Kafka commitId: 08036fa4b1e5b822 
> (org.apache.kafka.common.utils.AppInfoParser:110)
> [2019-02-12 07:02:17,199] INFO stream-client [clientId] State transition from 
> CREATED to REBALANCING (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-client [clientId] State transition from 
> REBALANCING to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-238] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
> (org.apache.kafka.clients.Metadata:365)
> [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
> (org.apache.kafka.clients.Metadata:365)
> [2019-02-12 07:02:17,205] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] Discovered group 
> coordinator localhost:36122 (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
> [2019-02-12 07:02:17,205] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] Discovered group 
> coordinator localhost:36122 (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] Revoking 
> previously assigned partitions [] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] Revoking 
> previously assigned partitions [] 
> 

[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767517#comment-16767517
 ] 

Matthias J. Sax commented on KAFKA-6460:


[~guozhang] I think it would also be important, to provide a simple way to 
configure the used store factory (eg, for DSL users) without the need to 
rewrite the code that should be tested. Thoughts?

> Add mocks for state stores used in Streams unit testing
> ---
>
> Key: KAFKA-6460
> URL: https://issues.apache.org/jira/browse/KAFKA-6460
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> We'd like to use mocks for different types of state stores: kv, window, 
> session that can be used to record the number of expected put / get calls 
> used in the DSL operator unit testing. This involves implementing the two 
> interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object 
> created from, say, EasyMock, and the object can then be set up with the 
> expected calls.
> In addition, we should also add a mock record collector which can be returned 
> from the mock processor context so that with logging enabled store, users can 
> also validate if the changes have been forwarded to the changelog as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-02-13 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767479#comment-16767479
 ] 

Guozhang Wang commented on KAFKA-6460:
--

Hi [~shung], sorry for the late replies. Here's my thoughts (I'd also update 
the description of the ticket as well).

The main goal for this test utils is similar to KIP-267, i.e. to provide users 
during their development cycles a smooth iterations of trial-and-error 
experience, where they do not necessarily need to bring up a full fledged 
rocksDB instance, for example. So the scope of this task would be:

1) provide a mock store interface that can be used for both 
{{Topology#addStateStore(StoreBuilder)}} (i.e. at the PAPI layer) and the 
{{Materialized#as(StoreSupplier>)}} (the DSL 
layer). 

For this, I'd propose we add {{org.apache.kafka.streams.test.MockStoreFactory}} 
inside {{streams/test-utils}} artifact, 

that can generate either a {{KeyValueStoreBuilder}} / {{WindowStoreBuilder}}, 
and {{SessionStoreBuilder}} (for PAPI) as well as a 
{{StoreSupplier>}} (for DSL) provided by a store 
name, whose {{build}} and {{get}} calls will return a mock (this mock 
implementation only need to be the inner-most store, i.e. it can still be 
wrapped with metered / caching / logging, and it should be inside 
{{org.apache.kafka.streams.test.internals}} package so that they are not part 
of the public APIs).

2) The mock store implementations should expect it's {{init}} function to be 
called with a {{MockProcessorContext}} which includes recording all records 
forwarded via this context (e.g. changelogs). So the mock store implementation 
itself only need to keep track of its store function calls.

3) The mock {{StoreBuilder}} or {{StoreSupplier}}'s {{build} / {{get}} calls 
are supposed to be only called once since they are expected to be used with 
{{TopologyTestDriver}} which does not create multiple topologies. Then users 
can get access the mocked store instance via {{TopologyTestDriver#getXXStore}} 
and then use their public APIs to query the store.

4) Additional to allow users to query the store directly, user's may want to 
also get how many function calls are triggered --  e.g. maybe the current store 
returns `2` for key `k`, but we also want to make sure it was because `put(k, 
1)` and `put(k, 2)` are called. This can be provided by a public API like 
{{MockStoreFactory#putEntries(storeName)}} that returns a list of entries that 
are called via {{put}}.

5) For Streams' own unit tests, we can then refactor them to use this new mock 
store factory. For example, we can remove the internal 
{{org.apache.kafka.streams.state.KeyValueStoreTestDriver}} and use the above to 
refactor any unit tests related to this class -- one logic that is not yet 
supported in {{TopologyTestDriver}} / {{MockStoreFactory}} is store 
restoration, i.e. streams library may wan to pipe-in some records to the 
corresponding changelog first before starting the test driver, which will then 
be used to bootstrap the (possibly mocked) stores. This is not of interest to 
users, but streams' own unit testing need to cover.


> Add mocks for state stores used in Streams unit testing
> ---
>
> Key: KAFKA-6460
> URL: https://issues.apache.org/jira/browse/KAFKA-6460
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> We'd like to use mocks for different types of state stores: kv, window, 
> session that can be used to record the number of expected put / get calls 
> used in the DSL operator unit testing. This involves implementing the two 
> interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object 
> created from, say, EasyMock, and the object can then be set up with the 
> expected calls.
> In addition, we should also add a mock record collector which can be returned 
> from the mock processor context so that with logging enabled store, users can 
> also validate if the changes have been forwarded to the changelog as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7565) NPE in KafkaConsumer

2019-02-13 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767474#comment-16767474
 ] 

Rajini Sivaram commented on KAFKA-7565:
---

[~hachikuji] I am not sure if this is covered by your PR for KAFKA-7831. 

We have a check for whether the partitions returned in a fetch response are 
still in the fetch session and we return if that is not the case: 
https://github.com/apache/kafka/blob/b02b5b63a5d43ab552c6cec1237707b2edd1bb36/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L234

Further down in the code, we assume that the partitions do exist in the session 
and access the partitions without checking for null: 
https://github.com/apache/kafka/blob/b02b5b63a5d43ab552c6cec1237707b2edd1bb36/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L243

Presumably fetch session partitions can change in between lines 234 aand 243 
since the response could be processed on the heartbeat thread. With the changes 
from your PR for KAFKA-7831, can we guarantee that the session partitions wont 
change?

> NPE in KafkaConsumer
> 
>
> Key: KAFKA-7565
> URL: https://issues.apache.org/jira/browse/KAFKA-7565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Alexey Vakhrenev
>Priority: Critical
> Fix For: 2.2.0
>
>
> The stacktrace is
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> {noformat}
> Couldn't find minimal reproducer, but it happens quite often in our system. 
> We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is 
> somehow related.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767446#comment-16767446
 ] 

Matthias J. Sax commented on KAFKA-7918:


SGTM.

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767458#comment-16767458
 ] 

Matthias J. Sax commented on KAFKA-7304:


Thanks [~rsivaram] and [~yuyang08]!

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Major
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot 
> 2018-08-29 at 10.50.47 AM.png, Screen Shot 2018-09-29 at 10.38.12 PM.png, 
> Screen Shot 2018-09-29 at 10.38.38 PM.png, Screen Shot 2018-09-29 at 8.34.50 
> PM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2019-02-13 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7481:
---
Fix Version/s: (was: 2.2.0)

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2019-02-13 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7481:
---
Priority: Critical  (was: Blocker)

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767449#comment-16767449
 ] 

Matthias J. Sax commented on KAFKA-7882:


Thanks for confirming. Then you are hitting 
https://issues.apache.org/jira/browse/KAFKA-7250 (it's fixed in 2.0.1 and 
2.1.0) – and the provided transformer is indeed shared what is the root cause 
of the issue. Seems, we can close this as duplicate?

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767443#comment-16767443
 ] 

Matthias J. Sax commented on KAFKA-7917:


> but simplifying the code base.

Having a layered code base and nest stores seems to be simpler to me instead of 
thousands of if-else statements in the code... (it's very subjective of course) 
– Looking forward to your PR.

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-02-13 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7652:
---
Labels: kip  (was: )

> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
> Attachments: kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-02-13 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7652:
---
Description: 
I'm creating this issue in response to [~guozhang]'s request on the mailing 
list:

[https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]

We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
experience a severe performance degradation. The highest amount of CPU time 
seems spent in retrieving from the local cache. Here's an example thread 
profile with 0.11.0.0:

[https://i.imgur.com/l5VEsC2.png]

When things are running smoothly we're gated by retrieving from the state store 
with acceptable performance. Here's an example thread profile with 0.10.2.1:

[https://i.imgur.com/IHxC2cZ.png]

Some investigation reveals that it appears we're performing about 3 orders 
magnitude more lookups on the NamedCache over a comparable time period. I've 
attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.

We're using session windows and have the app configured for commit.interval.ms 
= 30 * 1000 and cache.max.bytes.buffering = 10485760

I'm happy to share more details if they would be helpful. Also happy to run 
tests on our data.

I also found this issue, which seems like it may be related:

https://issues.apache.org/jira/browse/KAFKA-4904

 

KIP-420: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
 

 

  was:
I'm creating this issue in response to [~guozhang]'s request on the mailing 
list:

[https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]

We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
experience a severe performance degradation. The highest amount of CPU time 
seems spent in retrieving from the local cache. Here's an example thread 
profile with 0.11.0.0:

[https://i.imgur.com/l5VEsC2.png]

When things are running smoothly we're gated by retrieving from the state store 
with acceptable performance. Here's an example thread profile with 0.10.2.1:

[https://i.imgur.com/IHxC2cZ.png]

Some investigation reveals that it appears we're performing about 3 orders 
magnitude more lookups on the NamedCache over a comparable time period. I've 
attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.

We're using session windows and have the app configured for commit.interval.ms 
= 30 * 1000 and cache.max.bytes.buffering = 10485760

I'm happy to share more details if they would be helpful. Also happy to run 
tests on our data.

I also found this issue, which seems like it may be related:

https://issues.apache.org/jira/browse/KAFKA-4904

 


> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
> Attachments: kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  
> KIP-420: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
>  
>  



--
This message was sent by Atlassian JIRA

[jira] [Updated] (KAFKA-7920) Do not permit zstd use until inter.broker.protocol.version is updated to 2.1

2019-02-13 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7920:
---
Fix Version/s: 2.2.0

> Do not permit zstd use until inter.broker.protocol.version is updated to 2.1
> 
>
> Key: KAFKA-7920
> URL: https://issues.apache.org/jira/browse/KAFKA-7920
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Jason Gustafson
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.2.0
>
>
> After brokers have been upgraded to 2.1, users can begin using zstd 
> compression. Regardless of the inter.broker.protocol.version, the broker will 
> happily accept zstd-compressed data as long as the right produce request 
> version is used. However, if the inter.broker.protocol.version is set to 2.0 
> or below, then followers will not be able to use the minimum required fetch 
> version, which will result in the following error:
> {code}
> [2019-02-11 17:42:47,116] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition foo-0 at offset 0 
> (kafka.server.ReplicaFetcherThread)   
>   
>  
> org.apache.kafka.common.errors.UnsupportedCompressionTypeException: The 
> requesting client does not support the compression type of given partition.
> {code}
> We should make produce request validation consistent. Until the 
> inter.broker.protocol.version is at 2.1 or later, we should reject produce 
> requests with UNSUPPORTED_COMPRESSION_TYPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7920) Do not permit zstd use until inter.broker.protocol.version is updated to 2.1

2019-02-13 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7920:
---
Priority: Blocker  (was: Major)

> Do not permit zstd use until inter.broker.protocol.version is updated to 2.1
> 
>
> Key: KAFKA-7920
> URL: https://issues.apache.org/jira/browse/KAFKA-7920
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Jason Gustafson
>Assignee: Lee Dongjin
>Priority: Blocker
> Fix For: 2.2.0
>
>
> After brokers have been upgraded to 2.1, users can begin using zstd 
> compression. Regardless of the inter.broker.protocol.version, the broker will 
> happily accept zstd-compressed data as long as the right produce request 
> version is used. However, if the inter.broker.protocol.version is set to 2.0 
> or below, then followers will not be able to use the minimum required fetch 
> version, which will result in the following error:
> {code}
> [2019-02-11 17:42:47,116] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition foo-0 at offset 0 
> (kafka.server.ReplicaFetcherThread)   
>   
>  
> org.apache.kafka.common.errors.UnsupportedCompressionTypeException: The 
> requesting client does not support the compression type of given partition.
> {code}
> We should make produce request validation consistent. Until the 
> inter.broker.protocol.version is at 2.1 or later, we should reject produce 
> requests with UNSUPPORTED_COMPRESSION_TYPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7440) Use leader epoch in consumer fetch requests

2019-02-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767409#comment-16767409
 ] 

ASF GitHub Bot commented on KAFKA-7440:
---

hachikuji commented on pull request #6190: KAFKA-7440 Add leader epoch to fetch 
and list-offset request
URL: https://github.com/apache/kafka/pull/6190
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use leader epoch in consumer fetch requests
> ---
>
> Key: KAFKA-7440
> URL: https://issues.apache.org/jira/browse/KAFKA-7440
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: David Arthur
>Priority: Major
>  Labels: kip
>
> This patch adds support in the consumer to use the leader epoch obtained from 
> the metadata in fetch requests: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-13 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767389#comment-16767389
 ] 

John Roesler commented on KAFKA-7917:
-

Thanks for the comments, all.

 

The main motivation here is not a performance optimization (although there may 
be some), but simplifying the code base.

I agree it will require some discussion, so I've picked up the ticket, and I'm 
working on a WIP PR so we can have a concrete discussion about whether it 
results in a simpler system or not.

As I've been working on it, I do think that [~guozhang] is right, it seems to 
pave the way to needing that "root" store reference in init.

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-13 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767386#comment-16767386
 ] 

John Roesler commented on KAFKA-7918:
-

Should we also consider simplifying the StoreChangeLogger by inlining the 
generic types (which are again always `Bytes, byte[]`?

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-13 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-7917:
---

Assignee: John Roesler

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7927) Read committed receives aborted events

2019-02-13 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated KAFKA-7927:
-
Description: 
When a kafka client produces ~30k events and at the end it aborts the 
transaction a consumer can read part of the aborted messages when 
"isolation.level" set to "READ_COMMITTED".

Kafka client version: 2.0.0
Kafka broker version: 1.0.0
Producer:
{code:java}
java -jar 
kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic
{code}
See attached code.
Consumer:
{code:java}
kafka-console-consumer --zookeeper localhost:2181 --topic src-topic 
--from-beginning --isolation-level read_committed
{code}
Same behavior seen when re-implemented the consumer in scala.

The whole application can be found here: 
https://github.com/gaborgsomogyi/kafka-semantics-tester

  was:
When a kafka client produces ~30k events and at the end it aborts the 
transaction a consumer can read part of the aborted messages when 
"isolation.level" set to "READ_COMMITTED".

Kafka client version: 2.0.0
Kafka broker version: 1.0.0
Producer:
{code:java}
java -jar 
kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic
{code}
See attached code.
Consumer:
{code:java}
kafka-console-consumer --zookeeper localhost:2181 --topic src-topic 
--from-beginning --isolation-level read_committed
{code}
Same behavior seen when re-implemented the consumer in scala.


> Read committed receives aborted events
> --
>
> Key: KAFKA-7927
> URL: https://issues.apache.org/jira/browse/KAFKA-7927
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 1.0.0
>Reporter: Gabor Somogyi
>Priority: Blocker
> Attachments: KafkaProducer.scala, consumer.log, producer.log.gz
>
>
> When a kafka client produces ~30k events and at the end it aborts the 
> transaction a consumer can read part of the aborted messages when 
> "isolation.level" set to "READ_COMMITTED".
> Kafka client version: 2.0.0
> Kafka broker version: 1.0.0
> Producer:
> {code:java}
> java -jar 
> kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
> gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic
> {code}
> See attached code.
> Consumer:
> {code:java}
> kafka-console-consumer --zookeeper localhost:2181 --topic src-topic 
> --from-beginning --isolation-level read_committed
> {code}
> Same behavior seen when re-implemented the consumer in scala.
> The whole application can be found here: 
> https://github.com/gaborgsomogyi/kafka-semantics-tester



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7927) Read committed receives aborted events

2019-02-13 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated KAFKA-7927:
-
Attachment: consumer.log

> Read committed receives aborted events
> --
>
> Key: KAFKA-7927
> URL: https://issues.apache.org/jira/browse/KAFKA-7927
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 1.0.0
>Reporter: Gabor Somogyi
>Priority: Blocker
> Attachments: KafkaProducer.scala, consumer.log, producer.log.gz
>
>
> When a kafka client produces ~30k events and at the end it aborts the 
> transaction a consumer can read part of the aborted messages when 
> "isolation.level" set to "READ_COMMITTED".
> Kafka client version: 2.0.0
> Kafka broker version: 1.0.0
> Producer:
> {code:java}
> java -jar 
> kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
> gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic
> {code}
> See attached code.
> Consumer:
> {code:java}
> kafka-console-consumer --zookeeper localhost:2181 --topic src-topic 
> --from-beginning --isolation-level read_committed
> {code}
> Same behavior seen when re-implemented the consumer in scala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7927) Read committed receives aborted events

2019-02-13 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated KAFKA-7927:
-
Attachment: producer.log.gz

> Read committed receives aborted events
> --
>
> Key: KAFKA-7927
> URL: https://issues.apache.org/jira/browse/KAFKA-7927
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 1.0.0
>Reporter: Gabor Somogyi
>Priority: Blocker
> Attachments: KafkaProducer.scala, consumer.log, producer.log.gz
>
>
> When a kafka client produces ~30k events and at the end it aborts the 
> transaction a consumer can read part of the aborted messages when 
> "isolation.level" set to "READ_COMMITTED".
> Kafka client version: 2.0.0
> Kafka broker version: 1.0.0
> Producer:
> {code:java}
> java -jar 
> kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
> gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic
> {code}
> See attached code.
> Consumer:
> {code:java}
> kafka-console-consumer --zookeeper localhost:2181 --topic src-topic 
> --from-beginning --isolation-level read_committed
> {code}
> Same behavior seen when re-implemented the consumer in scala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7927) Read committed receives aborted events

2019-02-13 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated KAFKA-7927:
-
Attachment: KafkaProducer.scala

> Read committed receives aborted events
> --
>
> Key: KAFKA-7927
> URL: https://issues.apache.org/jira/browse/KAFKA-7927
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 1.0.0
>Reporter: Gabor Somogyi
>Priority: Blocker
> Attachments: KafkaProducer.scala, consumer.log, producer.log.gz
>
>
> When a kafka client produces ~30k events and at the end it aborts the 
> transaction a consumer can read part of the aborted messages when 
> "isolation.level" set to "READ_COMMITTED".
> Kafka client version: 2.0.0
> Kafka broker version: 1.0.0
> Producer:
> {code:java}
> java -jar 
> kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
> gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic
> {code}
> See attached code.
> Consumer:
> {code:java}
> kafka-console-consumer --zookeeper localhost:2181 --topic src-topic 
> --from-beginning --isolation-level read_committed
> {code}
> Same behavior seen when re-implemented the consumer in scala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7927) Read committed receives aborted events

2019-02-13 Thread Gabor Somogyi (JIRA)
Gabor Somogyi created KAFKA-7927:


 Summary: Read committed receives aborted events
 Key: KAFKA-7927
 URL: https://issues.apache.org/jira/browse/KAFKA-7927
 Project: Kafka
  Issue Type: Bug
  Components: consumer, core, producer 
Affects Versions: 1.0.0
Reporter: Gabor Somogyi


When a kafka client produces ~30k events and at the end it aborts the 
transaction a consumer can read part of the aborted messages when 
"isolation.level" set to "READ_COMMITTED".

Kafka client version: 2.0.0
Kafka broker version: 1.0.0
Producer:
{code:java}
java -jar 
kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic
{code}
See attached code.
Consumer:
{code:java}
kafka-console-consumer --zookeeper localhost:2181 --topic src-topic 
--from-beginning --isolation-level read_committed
{code}
Same behavior seen when re-implemented the consumer in scala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-13 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767006#comment-16767006
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

[~mjsax]
As I reported, I was using Kafka Streams 2.0.0 with Scala DSL API, where 
transform method accepts Transformer instance (not TransformerSupplier) as a 
parameter:
{code}
  def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)],
stateStoreNames: String*): KStream[K1, V1] = {
val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = new 
TransformerSupplier[K, V, KeyValue[K1, V1]] {
  override def get(): Transformer[K, V, KeyValue[K1, V1]] = {
new Transformer[K, V, KeyValue[K1, V1]] {
  override def transform(key: K, value: V): KeyValue[K1, V1] = {
transformer.transform(key, value) match {
  case (k1, v1) => KeyValue.pair(k1, v1)
  case _ => null
}
  }

  override def init(context: ProcessorContext): Unit = 
transformer.init(context)

  override def close(): Unit = transformer.close()
}
  }
}
inner.transform(transformerSupplierJ, stateStoreNames: _*)
  }
{code}
I believe the implementation changed now in 2.1.0 and does actually accept 
TransformerSupplier.

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7926) Issue with Kafka Broker - UnknownServerException

2019-02-13 Thread Santosh S (JIRA)
Santosh S created KAFKA-7926:


 Summary: Issue with Kafka Broker - UnknownServerException
 Key: KAFKA-7926
 URL: https://issues.apache.org/jira/browse/KAFKA-7926
 Project: Kafka
  Issue Type: Bug
  Components: core
 Environment: production
Reporter: Santosh S


Our application uses `springBootVersion = 2.0.4.RELEASE` along with 
`compile('io.projectreactor.kafka:reactor-kafka:1.0.1.RELEASE')` dependency.

The Kafka Broker that we have is at version `1.0.1`.

Intermittently when we send the messages onto Kafka by creating 
`reactor.kafka.sender.SenderRecord` and in response of Kafka when look for 
`reactor.kafka.sender.SenderResult.exception()` we have

`java.lang.RuntimeException: 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request` populated in the exception.

Upon retrying couple of times, the messages get through successfully.

On the broker logs the below error is being printed multiple times without any 
stacktrace

`[2019-02-08 15:43:07,501] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition price-promotions-local-event-0 
(kafka.server.ReplicaManager)
`

where `price-promotions-local-event` is our topic.

I have looked online but there is no definitive resolution or ways to triage 
this issue, many thanks in advance for any help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7925) Constant 100% cpu usage by all kafka brokers

2019-02-13 Thread Abhi (JIRA)
Abhi created KAFKA-7925:
---

 Summary: Constant 100% cpu usage by all kafka brokers
 Key: KAFKA-7925
 URL: https://issues.apache.org/jira/browse/KAFKA-7925
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.0
 Environment: Java 11, Kafka v2.1.0
Reporter: Abhi
 Attachments: threadump20190212.txt

Hi,

I am seeing constant 100% cpu usage on all brokers in our kafka cluster even 
without any clients connected to any broker.

This is a bug that we have seen multiple times in our kafka setup that is not 
yet open to clients. It is becoming a blocker for our deployment now.

I am seeing lot of connections to other brokers in CLOSE_WAIT state (see 
below). In thread usage, I am seeing these threads 
'kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-0,kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-1,kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-2'
 taking up more than 90% of the cpu time in a 60s interval.

I have attached a thread dump of one of the brokers in the cluster.

*Java version:*

openjdk 11.0.2 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

*Kafka verison:* v2.1.0

 

*connections:*

java 144319 kafkagod 88u IPv4 3063266 0t0 TCP *:35395 (LISTEN)
java 144319 kafkagod 89u IPv4 3063267 0t0 TCP *:9144 (LISTEN)
java 144319 kafkagod 104u IPv4 3064219 0t0 TCP 
mwkafka-prod-02.tbd:47292->mwkafka-zk-prod-05.tbd:2181 (ESTABLISHED)
java 144319 kafkagod 2003u IPv4 3055115 0t0 TCP *:9092 (LISTEN)
java 144319 kafkagod 2013u IPv4 7220110 0t0 TCP 
mwkafka-prod-02.tbd:60724->mwkafka-zk-prod-04.dr:2181 (ESTABLISHED)
java 144319 kafkagod 2020u IPv4 30012904 0t0 TCP 
mwkafka-prod-02.tbd:38988->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
java 144319 kafkagod 2021u IPv4 30012961 0t0 TCP 
mwkafka-prod-02.tbd:58420->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
java 144319 kafkagod 2027u IPv4 30015723 0t0 TCP 
mwkafka-prod-02.tbd:58398->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
java 144319 kafkagod 2028u IPv4 30015630 0t0 TCP 
mwkafka-prod-02.tbd:36248->mwkafka-prod-02.dr:9092 (ESTABLISHED)
java 144319 kafkagod 2030u IPv4 30015726 0t0 TCP 
mwkafka-prod-02.tbd:39012->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
java 144319 kafkagod 2031u IPv4 30013619 0t0 TCP 
mwkafka-prod-02.tbd:38986->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
java 144319 kafkagod 2032u IPv4 30015604 0t0 TCP 
mwkafka-prod-02.tbd:36246->mwkafka-prod-02.dr:9092 (ESTABLISHED)
java 144319 kafkagod 2033u IPv4 30012981 0t0 TCP 
mwkafka-prod-02.tbd:36924->mwkafka-prod-01.dr:9092 (ESTABLISHED)
java 144319 kafkagod 2034u IPv4 30012967 0t0 TCP 
mwkafka-prod-02.tbd:39036->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
java 144319 kafkagod 2035u IPv4 30012898 0t0 TCP 
mwkafka-prod-02.tbd:36866->mwkafka-prod-01.dr:9092 (FIN_WAIT2)
java 144319 kafkagod 2036u IPv4 30004729 0t0 TCP 
mwkafka-prod-02.tbd:36882->mwkafka-prod-01.dr:9092 (ESTABLISHED)
java 144319 kafkagod 2037u IPv4 30004914 0t0 TCP 
mwkafka-prod-02.tbd:58426->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
java 144319 kafkagod 2038u IPv4 30015651 0t0 TCP 
mwkafka-prod-02.tbd:36884->mwkafka-prod-01.dr:9092 (ESTABLISHED)
java 144319 kafkagod 2039u IPv4 30012966 0t0 TCP 
mwkafka-prod-02.tbd:58422->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
java 144319 kafkagod 2040u IPv4 30005643 0t0 TCP 
mwkafka-prod-02.tbd:36252->mwkafka-prod-02.dr:9092 (ESTABLISHED)
java 144319 kafkagod 2041u IPv4 30012944 0t0 TCP 
mwkafka-prod-02.tbd:36286->mwkafka-prod-02.dr:9092 (ESTABLISHED)
java 144319 kafkagod 2042u IPv4 30012973 0t0 TCP 
mwkafka-prod-02.tbd:9092->mwkafka-prod-01.nyc:51924 (ESTABLISHED)
java 144319 kafkagod 2043u sock 0,7 0t0 30012463 protocol: TCP
java 144319 kafkagod 2044u IPv4 30012979 0t0 TCP 
mwkafka-prod-02.tbd:9092->mwkafka-prod-01.dr:39994 (ESTABLISHED)
java 144319 kafkagod 2045u IPv4 30012899 0t0 TCP 
mwkafka-prod-02.tbd:9092->mwkafka-prod-02.nyc:34548 (ESTABLISHED)
java 144319 kafkagod 2046u sock 0,7 0t0 30003437 protocol: TCP
java 144319 kafkagod 2047u IPv4 30012980 0t0 TCP 
mwkafka-prod-02.tbd:9092->mwkafka-prod-02.dr:38120 (ESTABLISHED)
java 144319 kafkagod 2048u sock 0,7 0t0 30012546 protocol: TCP
java 144319 kafkagod 2049u IPv4 30005418 0t0 TCP 
mwkafka-prod-02.tbd:9092->mwkafka-prod-01.dr:39686 (CLOSE_WAIT)
java 144319 kafkagod 2050u IPv4 30009977 0t0 TCP 
mwkafka-prod-02.tbd:9092->mwkafka-prod-02.nyc:34552 (ESTABLISHED)
java 144319 kafkagod 2060u sock 0,7 0t0 30003439 protocol: TCP
java 144319 kafkagod 2061u IPv4 30012906 0t0 TCP 
mwkafka-prod-02.tbd:9092->mwkafka-prod-01.nyc:51862 (ESTABLISHED)
java 144319 kafkagod 2069u IPv4 30005642 0t0 TCP 
mwkafka-prod-02.tbd:9092->mwkafka-prod-02.nyc:34570 (ESTABLISHED)
java 144319 kafkagod 2073u sock 0,7 0t0 30003440 protocol: TCP
java 144319 kafkagod 2086u IPv4 30005644 0t0 TCP 

[jira] [Comment Edited] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-13 Thread Daniele Ascione (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16766934#comment-16766934
 ] 

Daniele Ascione edited comment on KAFKA-7794 at 2/13/19 8:37 AM:
-

[~kartikvk1996] I've used Kafka 0.10.2.1

Tried, on client side, with 0.10.2.1 and 0.10.2.2


was (Author: dascione):
[~kartikvk1996] I've used Kafka 0.10.2.1

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, 
> image-2019-02-13-11-43-28-873.png, image-2019-02-13-11-44-18-736.png, 
> image-2019-02-13-11-45-21-459.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-13 Thread Daniele Ascione (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16766934#comment-16766934
 ] 

Daniele Ascione commented on KAFKA-7794:


[~kartikvk1996] I've used Kafka 0.10.2.1

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, 
> image-2019-02-13-11-43-28-873.png, image-2019-02-13-11-44-18-736.png, 
> image-2019-02-13-11-45-21-459.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)