[jira] [Assigned] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-11 Thread Kartik (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik reassigned KAFKA-7794:
-

Assignee: Kartik

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-11 Thread Kartik (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765684#comment-16765684
 ] 

Kartik commented on KAFKA-7794:
---

Hi [~huxi_2b] , tagging you because you might know this. 

Ideally when the timestamp is provided > latest committed record timestamp, 
then is it good to return the latest offset right? or you want the error 
message should be thrown. based on your comment, I can work on this.

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7896) Add some Log4J Kafka Properties for Producing to Secured Brokers

2019-02-11 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7896.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> Add some Log4J Kafka Properties for Producing to Secured Brokers
> 
>
> Key: KAFKA-7896
> URL: https://issues.apache.org/jira/browse/KAFKA-7896
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rohan Desai
>Assignee: Rohan Desai
>Priority: Major
> Fix For: 2.3.0
>
>
> The existing Log4J Kafka appender supports producing to brokers that use the 
> GSSAPI (kerberos) sasl mechanism, and only support configuring jaas via a 
> jaas config file. Filing this issue to cover extending this to include the 
> PLAIN mechanism and to support configuring jaas via an in-line configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765620#comment-16765620
 ] 

Guozhang Wang commented on KAFKA-7917:
--

Another related issue is https://issues.apache.org/jira/browse/KAFKA-3229: when 
we call `init` on wrapper we'd need to remember the "outer-most wrapper" store 
so that it can be returned in the `ProcessorContext#getStateStore`. So suppose 
we've done 7917 to collapse all the layers, we can remove the second `root` 
parameter and implement `init` as (i.e. we call context register in the outer 
store):

```
super.init();

context.register(batchingStateRestoreCallback);
```

Then there will be two cases:

1. the inner store is streams built-in, whose `init` not call 
context.register().
2. the inner store is user customized, which may already call 
context.register(); in this case we just need allow `ProcessorStateManager` to 
overwrite the inner store with the outer ones with the call on outer store.

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread YeLiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YeLiang updated KAFKA-7918:
---
Description: 
Currently, the fundamental layer of stores in Streams is the "bytes store".

The easiest way to identify this is in `org.apache.kafka.streams.state.Stores`, 
all the `StoreBuilder`s require a `XXBytesStoreSupplier`. 

We provide several implementations of these bytes stores, typically an 
in-memory one and a persistent one (aka RocksDB).

Inside these bytes stores, the key is always `Bytes` and the value is always 
`byte[]` (serialization happens at a higher level). However, the store 
implementations are generically typed, just `K` and `V`.

This is good for flexibility, but it makes the code a little harder to 
understand. I think that we used to do serialization at a lower level, so the 
generics are a hold-over from that.

It would simplify the code if we just inlined the actual k/v types and maybe 
even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
`InMemoryKeyValueBytesStore`, and so forth.

 

  was:
Currently, the fundamental layer of stores in Streams is the "bytes store".

The easiest way to identify this is in `org.apache.kafka.streams.state.Stores`, 
all the `StoreBuilder`s require a `XXBytesStoreSupplier`. 

We provide several implementations of these bytes stores, typically an 
in-memory one and a persistent one (aka RocksDB).

Inside these bytes stores, the key is always `Bytes` and the value is always 
`byte[]` (serialization happens at a higher level). However, the store 
implementations are generically typed, just `K` and `V`.

This is good for flexibility, but it makes the code a little harder to 
understand. I think that we used to do serialization at a lower level, so the 
generics are a hold-over from that.

It would simplify the code if we just inlined the actual k/v types and maybe 
even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
`InMemoryKeyValueBytesStore`, and so forth.


> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7920) Do not permit zstd use until inter.broker.protocol.version is updated to 2.1

2019-02-11 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765594#comment-16765594
 ] 

Jason Gustafson commented on KAFKA-7920:


[~dongjin] I don't suppose you have time to look at this?

> Do not permit zstd use until inter.broker.protocol.version is updated to 2.1
> 
>
> Key: KAFKA-7920
> URL: https://issues.apache.org/jira/browse/KAFKA-7920
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Jason Gustafson
>Priority: Major
>
> After brokers have been upgraded to 2.1, users can begin using zstd 
> compression. Regardless of the inter.broker.protocol.version, the broker will 
> happily accept zstd-compressed data as long as the right produce request 
> version is used. However, if the inter.broker.protocol.version is set to 2.0 
> or below, then followers will not be able to use the minimum required fetch 
> version, which will result in the following error:
> {code}
> [2019-02-11 17:42:47,116] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition foo-0 at offset 0 
> (kafka.server.ReplicaFetcherThread)   
>   
>  
> org.apache.kafka.common.errors.UnsupportedCompressionTypeException: The 
> requesting client does not support the compression type of given partition.
> {code}
> We should make produce request validation consistent. Until the 
> inter.broker.protocol.version is at 2.1 or later, we should reject produce 
> requests with UNSUPPORTED_COMPRESSION_TYPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7920) Do not permit zstd use until inter.broker.protocol.version is updated to 2.1

2019-02-11 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-7920:
--

 Summary: Do not permit zstd use until 
inter.broker.protocol.version is updated to 2.1
 Key: KAFKA-7920
 URL: https://issues.apache.org/jira/browse/KAFKA-7920
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.1.0
Reporter: Jason Gustafson


After brokers have been upgraded to 2.1, users can begin using zstd 
compression. Regardless of the inter.broker.protocol.version, the broker will 
happily accept zstd-compressed data as long as the right produce request 
version is used. However, if the inter.broker.protocol.version is set to 2.0 or 
below, then followers will not be able to use the minimum required fetch 
version, which will result in the following error:

{code}
[2019-02-11 17:42:47,116] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
fetcherId=0] Error for partition foo-0 at offset 0 
(kafka.server.ReplicaFetcherThread) 
 
org.apache.kafka.common.errors.UnsupportedCompressionTypeException: The 
requesting client does not support the compression type of given partition.
{code}

We should make produce request validation consistent. Until the 
inter.broker.protocol.version is at 2.1 or later, we should reject produce 
requests with UNSUPPORTED_COMPRESSION_TYPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765593#comment-16765593
 ] 

Sophie Blee-Goldman commented on KAFKA-7918:


A number of tests will need to be reworked, but I don't see any public APIs 
standing in the way 

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765587#comment-16765587
 ] 

Sophie Blee-Goldman edited comment on KAFKA-7918 at 2/12/19 1:43 AM:
-

For the LRUStore, there are two classes this should be applied to:

4)
 # MemoryLRUCache
 # MemoryNavigableLRUCache


was (Author: ableegoldman):
For the LRUStore, there are two classes this should be applied to:

5) MemoryLRUCache

6) MemoryNavigableLRUCache

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765587#comment-16765587
 ] 

Sophie Blee-Goldman commented on KAFKA-7918:


For the LRUStore, there are two classes this should be applied to:

5) MemoryLRUCache

6) MemoryNavigableLRUCache

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-7918:
---
Comment: was deleted

(was: The RocksDB key-value store is actually already implemented as a  store, so this applies to the following stores:

 
 * RocksDBWindowStore
 * RocksDBSessionStore
 * MemoryLRUCache
 * MemoryNavigableLRUCache
 * InMemoryKeyValueStore
 * (upcoming) InMemoryWindowStore)

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765585#comment-16765585
 ] 

Sophie Blee-Goldman commented on KAFKA-7918:


The RocksDB key-value store is actually already implemented as a  store, so this applies to the following stores:

 
 * RocksDBWindowStore
 * RocksDBSessionStore
 * MemoryLRUCache
 * MemoryNavigableLRUCache
 * InMemoryKeyValueStore
 * (upcoming) InMemoryWindowStore

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765583#comment-16765583
 ] 

Guozhang Wang commented on KAFKA-7918:
--

I made a quick pass on the current code base, and it seems at least the 
following classes can get rid of generics as they are all internal classes and 
there's no public APIs that could get them out of a generic type (need to have 
another pair of eyes to validate):

1) RocksDBWindowStore
2) RocksDBSessionStore
3) InMemoryKeyValueStore
4) LRUStore

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7845) Kafka clients do not re-resolve ips when a broker is replaced.

2019-02-11 Thread Jennifer Thompson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765577#comment-16765577
 ] 

Jennifer Thompson edited comment on KAFKA-7845 at 2/12/19 1:07 AM:
---

Quite probably. I tried using the 2.1.1-rc1 snapshot jars in the clients, but 
that didn't fix the problem. I also tried different configs like 
"client.dns.lookup" but didn't get anywhere.

I suspect that they are still getting stale data from the other kafka brokers. 
I can't use 2.1.1 for our brokers because we are using the confluent 
distribution, which seems to require extra classes in the kafka-clients jar. I 
will try again when confluent 5.1.1 is available.


was (Author: jentho):
Quite probably. I tried using the 2.1.1-rc1 snapshot jars in the clients, but 
that didn't fix the problem. I also tried different configs like 
"client.dns.lookup" but didn't get anywhere.

I suspect that they are still getting stale data from the other kafka brokers. 
I can't our brokers with 2.1.1 because we are using the confluent distribution, 
which seems to require extra classes in the kafka-clients jar. I will try again 
when confluent 5.1.1 is available.

> Kafka clients do not re-resolve ips when a broker is replaced.
> --
>
> Key: KAFKA-7845
> URL: https://issues.apache.org/jira/browse/KAFKA-7845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Jennifer Thompson
>Priority: Major
>
> When one of our Kafka brokers dies and a new one replaces it (via an aws 
> ASG), the clients that publish to Kafka still try to publish to the old 
> brokers.
> We see errors like 
> {code:java}
> 2019-01-18 20:16:16 WARN NetworkClient:721 - [Producer clientId=producer-1] 
> Connection to node 2 (/10.130.98.111:9092) could not be established. Broker 
> may not be available.
> 2019-01-18 20:19:09 WARN Sender:596 - [Producer clientId=producer-1] Got 
> error produce response with correlation id 3414 on topic-partition aa.pga-2, 
> retrying (4 attempts left). Error: NOT_LEADER_FOR_PARTITION
> 2019-01-18 20:19:09 WARN Sender:641 - [Producer clientId=producer-1] Received 
> invalid metadata error in produce request on partition aa.pga-2 due to 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.. Going to request metadata update now
> 2019-01-18 20:21:19 WARN NetworkClient:721 - [Producer clientId=producer-1] 
> Connection to node 2 (/10.130.98.111:9092) could not be established. Broker 
> may not be available.
> 2019-01-18 20:21:50 ERROR ProducerBatch:233 - Error executing user-provided 
> callback on message for topic-partition 'aa.test-liz-0'{code}
> and
> {code:java}
> [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} 
> Failed to flush, timed out while waiting for producer to flush outstanding 27 
> messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} 
> Failed to commit offsets 
> (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
> {code}
> The ip address referenced is for the broker that died. We have Kafka Manager 
> running as well, and that picks up the new broker.
> We already set
> {code:java}
> networkaddress.cache.ttl=60{code}
> in
> {code:java}
> jre/lib/security/java.security{code}
> Our java version is "Java(TM) SE Runtime Environment (build 1.8.0_192-b12)"
> This started happening after we upgraded to 2.1. When had Kafka 1.1, brokers 
> could failover without a problem.
> One thing that might be considered unusual about our deployment is that we 
> reuse the same broker id and EBS volume for the new broker, so that 
> partitions do not have to be reassigned.
> In kafka-connect, the logs look like
> {code}
> [2019-01-28 22:11:02,364] WARN [Consumer clientId=consumer-1, 
> groupId=connect-cluster] Connection to node 3 (/10.130.153.120:9092) could 
> not be established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2019-01-28 22:11:02,365] INFO [Consumer clientId=consumer-1, 
> groupId=connect-cluster] Error sending fetch request (sessionId=201133590, 
> epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException. 
> (org.apache.kafka.clients.FetchSessionHandler)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7845) Kafka clients do not re-resolve ips when a broker is replaced.

2019-02-11 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765578#comment-16765578
 ] 

Ismael Juma commented on KAFKA-7845:


I think you may need the latest RC (rc2 I think) for KAFKA-7890. That should do 
it, I think.

> Kafka clients do not re-resolve ips when a broker is replaced.
> --
>
> Key: KAFKA-7845
> URL: https://issues.apache.org/jira/browse/KAFKA-7845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Jennifer Thompson
>Priority: Major
>
> When one of our Kafka brokers dies and a new one replaces it (via an aws 
> ASG), the clients that publish to Kafka still try to publish to the old 
> brokers.
> We see errors like 
> {code:java}
> 2019-01-18 20:16:16 WARN NetworkClient:721 - [Producer clientId=producer-1] 
> Connection to node 2 (/10.130.98.111:9092) could not be established. Broker 
> may not be available.
> 2019-01-18 20:19:09 WARN Sender:596 - [Producer clientId=producer-1] Got 
> error produce response with correlation id 3414 on topic-partition aa.pga-2, 
> retrying (4 attempts left). Error: NOT_LEADER_FOR_PARTITION
> 2019-01-18 20:19:09 WARN Sender:641 - [Producer clientId=producer-1] Received 
> invalid metadata error in produce request on partition aa.pga-2 due to 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.. Going to request metadata update now
> 2019-01-18 20:21:19 WARN NetworkClient:721 - [Producer clientId=producer-1] 
> Connection to node 2 (/10.130.98.111:9092) could not be established. Broker 
> may not be available.
> 2019-01-18 20:21:50 ERROR ProducerBatch:233 - Error executing user-provided 
> callback on message for topic-partition 'aa.test-liz-0'{code}
> and
> {code:java}
> [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} 
> Failed to flush, timed out while waiting for producer to flush outstanding 27 
> messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} 
> Failed to commit offsets 
> (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
> {code}
> The ip address referenced is for the broker that died. We have Kafka Manager 
> running as well, and that picks up the new broker.
> We already set
> {code:java}
> networkaddress.cache.ttl=60{code}
> in
> {code:java}
> jre/lib/security/java.security{code}
> Our java version is "Java(TM) SE Runtime Environment (build 1.8.0_192-b12)"
> This started happening after we upgraded to 2.1. When had Kafka 1.1, brokers 
> could failover without a problem.
> One thing that might be considered unusual about our deployment is that we 
> reuse the same broker id and EBS volume for the new broker, so that 
> partitions do not have to be reassigned.
> In kafka-connect, the logs look like
> {code}
> [2019-01-28 22:11:02,364] WARN [Consumer clientId=consumer-1, 
> groupId=connect-cluster] Connection to node 3 (/10.130.153.120:9092) could 
> not be established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2019-01-28 22:11:02,365] INFO [Consumer clientId=consumer-1, 
> groupId=connect-cluster] Error sending fetch request (sessionId=201133590, 
> epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException. 
> (org.apache.kafka.clients.FetchSessionHandler)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-7918:
--

Assignee: Sophie Blee-Goldman

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7845) Kafka clients do not re-resolve ips when a broker is replaced.

2019-02-11 Thread Jennifer Thompson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765577#comment-16765577
 ] 

Jennifer Thompson commented on KAFKA-7845:
--

Quite probably. I tried using the 2.1.1-rc1 snapshot jars in the clients, but 
that didn't fix the problem. I also tried different configs like 
"client.dns.lookup" but didn't get anywhere.

I suspect that they are still getting stale data from the other kafka brokers. 
I can't our brokers with 2.1.1 because we are using the confluent distribution, 
which seems to require extra classes in the kafka-clients jar. I will try again 
when confluent 5.1.1 is available.

> Kafka clients do not re-resolve ips when a broker is replaced.
> --
>
> Key: KAFKA-7845
> URL: https://issues.apache.org/jira/browse/KAFKA-7845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Jennifer Thompson
>Priority: Major
>
> When one of our Kafka brokers dies and a new one replaces it (via an aws 
> ASG), the clients that publish to Kafka still try to publish to the old 
> brokers.
> We see errors like 
> {code:java}
> 2019-01-18 20:16:16 WARN NetworkClient:721 - [Producer clientId=producer-1] 
> Connection to node 2 (/10.130.98.111:9092) could not be established. Broker 
> may not be available.
> 2019-01-18 20:19:09 WARN Sender:596 - [Producer clientId=producer-1] Got 
> error produce response with correlation id 3414 on topic-partition aa.pga-2, 
> retrying (4 attempts left). Error: NOT_LEADER_FOR_PARTITION
> 2019-01-18 20:19:09 WARN Sender:641 - [Producer clientId=producer-1] Received 
> invalid metadata error in produce request on partition aa.pga-2 due to 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.. Going to request metadata update now
> 2019-01-18 20:21:19 WARN NetworkClient:721 - [Producer clientId=producer-1] 
> Connection to node 2 (/10.130.98.111:9092) could not be established. Broker 
> may not be available.
> 2019-01-18 20:21:50 ERROR ProducerBatch:233 - Error executing user-provided 
> callback on message for topic-partition 'aa.test-liz-0'{code}
> and
> {code:java}
> [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} 
> Failed to flush, timed out while waiting for producer to flush outstanding 27 
> messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} 
> Failed to commit offsets 
> (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
> {code}
> The ip address referenced is for the broker that died. We have Kafka Manager 
> running as well, and that picks up the new broker.
> We already set
> {code:java}
> networkaddress.cache.ttl=60{code}
> in
> {code:java}
> jre/lib/security/java.security{code}
> Our java version is "Java(TM) SE Runtime Environment (build 1.8.0_192-b12)"
> This started happening after we upgraded to 2.1. When had Kafka 1.1, brokers 
> could failover without a problem.
> One thing that might be considered unusual about our deployment is that we 
> reuse the same broker id and EBS volume for the new broker, so that 
> partitions do not have to be reassigned.
> In kafka-connect, the logs look like
> {code}
> [2019-01-28 22:11:02,364] WARN [Consumer clientId=consumer-1, 
> groupId=connect-cluster] Connection to node 3 (/10.130.153.120:9092) could 
> not be established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2019-01-28 22:11:02,365] INFO [Consumer clientId=consumer-1, 
> groupId=connect-cluster] Error sending fetch request (sessionId=201133590, 
> epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException. 
> (org.apache.kafka.clients.FetchSessionHandler)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7236) Add --under-min-isr option to describe topics command

2019-02-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7236:
---
Fix Version/s: (was: 2.2.0)
   2.3.0

> Add --under-min-isr option to describe topics command
> -
>
> Key: KAFKA-7236
> URL: https://issues.apache.org/jira/browse/KAFKA-7236
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Kevin Lu
>Assignee: Kevin Lu
>Priority: Minor
> Fix For: 2.3.0
>
>
> The "min.insync.replicas" configuration specifies the minimum number of 
> insync replicas required for a partition to accept messages from the 
> producer. If the insync replica count of a partition falls under the 
> specified "min.insync.replicas", then the broker will reject messages for 
> producers using acks=all. These producers will suffer unavailability as they 
> will see a NotEnoughReplicas or NotEnoughReplicasAfterAppend exception.
> We currently have an UnderMinIsrPartitionCount metric which is useful for 
> identifying when partitions fall under "min.insync.replicas", however it is 
> still difficult to identify which topic partitions are affected and need 
> fixing.
> We can leverage the describe topics command in TopicCommand to add an option 
> "--under-minisr-partitions" to list out exactly which topic partitions are 
> below "min.insync.replicas".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7236) Add --under-min-isr option to describe topics command

2019-02-11 Thread Kevin Lu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Lu updated KAFKA-7236:

Fix Version/s: 2.2.0

> Add --under-min-isr option to describe topics command
> -
>
> Key: KAFKA-7236
> URL: https://issues.apache.org/jira/browse/KAFKA-7236
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Kevin Lu
>Assignee: Kevin Lu
>Priority: Minor
> Fix For: 2.2.0
>
>
> The "min.insync.replicas" configuration specifies the minimum number of 
> insync replicas required for a partition to accept messages from the 
> producer. If the insync replica count of a partition falls under the 
> specified "min.insync.replicas", then the broker will reject messages for 
> producers using acks=all. These producers will suffer unavailability as they 
> will see a NotEnoughReplicas or NotEnoughReplicasAfterAppend exception.
> We currently have an UnderMinIsrPartitionCount metric which is useful for 
> identifying when partitions fall under "min.insync.replicas", however it is 
> still difficult to identify which topic partitions are affected and need 
> fixing.
> We can leverage the describe topics command in TopicCommand to add an option 
> "--under-minisr-partitions" to list out exactly which topic partitions are 
> below "min.insync.replicas".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7904) KIP-427: Add AtMinIsr topic partition category (new metric & TopicCommand option)

2019-02-11 Thread Kevin Lu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Lu updated KAFKA-7904:

Summary: KIP-427: Add AtMinIsr topic partition category (new metric & 
TopicCommand option)  (was: Add AtMinIsr topic partition category)

> KIP-427: Add AtMinIsr topic partition category (new metric & TopicCommand 
> option)
> -
>
> Key: KAFKA-7904
> URL: https://issues.apache.org/jira/browse/KAFKA-7904
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Kevin Lu
>Assignee: Kevin Lu
>Priority: Minor
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-427%3A+Add+AtMinIsr+topic+partition+category



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-11 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765551#comment-16765551
 ] 

Lee Dongjin commented on KAFKA-7917:


[~vvcephei] Thanks for creating the issue. I also have been thinking about this 
issue. Here are some suggestions:

1. How about making an umbrella issue like 'Improve Streams Store 
implementations' and make related issues (KAFKA-7916, KAFKA-7917, KAFKA-7918, 
KAFKA-7919) the subtasks of it?
2. Also, these issues need a discussion as [~mjsax] commented. How about 
opening a discussion thread after packing them under the umbrella issue?

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7904) Add AtMinIsr topic partition category

2019-02-11 Thread Kevin Lu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765521#comment-16765521
 ] 

Kevin Lu commented on KAFKA-7904:
-

Hi [~sliebau], I am close to finishing up the draft and will publish it this 
week! Thanks~

> Add AtMinIsr topic partition category
> -
>
> Key: KAFKA-7904
> URL: https://issues.apache.org/jira/browse/KAFKA-7904
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Kevin Lu
>Assignee: Kevin Lu
>Priority: Minor
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-427%3A+Add+AtMinIsr+topic+partition+category



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7904) Add AtMinIsr topic partition category

2019-02-11 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765512#comment-16765512
 ] 

Sönke Liebau commented on KAFKA-7904:
-

Hi [~lu.kevin], could you please add some more detail to this ticket and the 
KIP? Both of them pretty much only consist of the headline.

> Add AtMinIsr topic partition category
> -
>
> Key: KAFKA-7904
> URL: https://issues.apache.org/jira/browse/KAFKA-7904
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Kevin Lu
>Assignee: Kevin Lu
>Priority: Minor
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-427%3A+Add+AtMinIsr+topic+partition+category



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7919) Reorganize Stores builders

2019-02-11 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7919:

Labels: needs-discussion needs-kip  (was: needs-kip)

> Reorganize Stores builders
> --
>
> Key: KAFKA-7919
> URL: https://issues.apache.org/jira/browse/KAFKA-7919
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> I have heard complaints from a few people that they find the whole process of 
> using `Materialized`, `Stores`, `StoreBuilder`s, and `StoreSupplier`s 
> confusing.
> I think it would help if we separated Stores into separate StoreBuilders and 
> BytesStoreSuppliers factory classes. Or maybe even break the suppliers 
> factory down further into `KeyValueBytesStoreSuppliers`, etc. Then, the 
> javadocs in `Materialized.as` can point people to the right factory class to 
> use, and, when they get there, they'll only be confronted with options they 
> can make use of.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7236) Add --under-min-isr option to describe topics command

2019-02-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765456#comment-16765456
 ] 

ASF GitHub Bot commented on KAFKA-7236:
---

vahidhashemian commented on pull request #6224: KAFKA-7236: Add --under-min-isr 
option to describe topics command (KIP-351)
URL: https://github.com/apache/kafka/pull/6224
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add --under-min-isr option to describe topics command
> -
>
> Key: KAFKA-7236
> URL: https://issues.apache.org/jira/browse/KAFKA-7236
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Kevin Lu
>Assignee: Kevin Lu
>Priority: Minor
>
> The "min.insync.replicas" configuration specifies the minimum number of 
> insync replicas required for a partition to accept messages from the 
> producer. If the insync replica count of a partition falls under the 
> specified "min.insync.replicas", then the broker will reject messages for 
> producers using acks=all. These producers will suffer unavailability as they 
> will see a NotEnoughReplicas or NotEnoughReplicasAfterAppend exception.
> We currently have an UnderMinIsrPartitionCount metric which is useful for 
> identifying when partitions fall under "min.insync.replicas", however it is 
> still difficult to identify which topic partitions are affected and need 
> fixing.
> We can leverage the describe topics command in TopicCommand to add an option 
> "--under-minisr-partitions" to list out exactly which topic partitions are 
> below "min.insync.replicas".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7919) Reorganize Stores builders

2019-02-11 Thread John Roesler (JIRA)
John Roesler created KAFKA-7919:
---

 Summary: Reorganize Stores builders
 Key: KAFKA-7919
 URL: https://issues.apache.org/jira/browse/KAFKA-7919
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


I have heard complaints from a few people that they find the whole process of 
using `Materialized`, `Stores`, `StoreBuilder`s, and `StoreSupplier`s confusing.

I think it would help if we separated Stores into separate StoreBuilders and 
BytesStoreSuppliers factory classes. Or maybe even break the suppliers factory 
down further into `KeyValueBytesStoreSuppliers`, etc. Then, the javadocs in 
`Materialized.as` can point people to the right factory class to use, and, when 
they get there, they'll only be confronted with options they can make use of.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7556) KafkaConsumer.beginningOffsets does not return actual first offsets

2019-02-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765451#comment-16765451
 ] 

Matthias J. Sax commented on KAFKA-7556:


[~ijuma] [~hachikuji] This is marked as "Critical" for 2.2 release but it seems 
nobody is working on this. Can you comment on this ticket? Is it critical? Is 
it relevant for 2.2?

> KafkaConsumer.beginningOffsets does not return actual first offsets
> ---
>
> Key: KAFKA-7556
> URL: https://issues.apache.org/jira/browse/KAFKA-7556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0
>Reporter: Robert V
>Priority: Critical
>  Labels: documentation, usability
> Fix For: 2.2.0
>
>
> h2. Description of the problem
> The method `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets` 
> claims in its Javadoc documentation that it would 'Get the first offset for 
> the given partitions.'.
> I used it with a compacted topic, and it always returned offset 0 for all 
> partitions.
>  Not sure if using a compacted topic actually matters, but I'm enclosing this 
> information anyway.
> Given a Kafka topic with retention set, and old log files being removed as a 
> result of that, the effective start offset of those partitions move further; 
> it simply will be greater than offset 0.
> However, calling the `beginningOffsets` method always returns offset 0 as the 
> first offset.
> In contrast, when the method 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes` is called 
> with a timestamp of 0L (UNIX epoch 1st Jan, 1970), it correctly returns the 
> effective start offsets for each partitions.
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets`: 
> {code:java}
> {test.topic-87=0, test.topic-54=0, test.topic-21=0, test.topic-79=0, 
> test.topic-46=0, test.topic-13=0, test.topic-70=0, test.topic-37=0, 
> test.topic-12=0, test.topic-95=0, test.topic-62=0, test.topic-29=0, 
> test.topic-4=0, test.topic-88=0, test.topic-55=0, test.topic-22=0, 
> test.topic-80=0, test.topic-47=0, test.topic-14=0, test.topic-71=0, 
> test.topic-38=0, test.topic-5=0, test.topic-96=0, test.topic-63=0, 
> test.topic-30=0, test.topic-56=0, test.topic-23=0, test.topic-89=0, 
> test.topic-48=0, test.topic-15=0, test.topic-81=0, test.topic-72=0, 
> test.topic-39=0, test.topic-6=0, test.topic-64=0, test.topic-31=0, 
> test.topic-97=0, test.topic-24=0, test.topic-90=0, test.topic-57=0, 
> test.topic-16=0, test.topic-82=0, test.topic-49=0, test.topic-40=0, 
> test.topic-7=0, test.topic-73=0, test.topic-32=0, test.topic-98=0, 
> test.topic-65=0, test.topic-91=0, test.topic-58=0, test.topic-25=0, 
> test.topic-83=0, test.topic-50=0, test.topic-17=0, test.topic-8=0, 
> test.topic-74=0, test.topic-41=0, test.topic-0=0, test.topic-99=0, 
> test.topic-66=0, test.topic-33=0, test.topic-92=0, test.topic-59=0, 
> test.topic-26=0, test.topic-84=0, test.topic-51=0, test.topic-18=0, 
> test.topic-75=0, test.topic-42=0, test.topic-9=0, test.topic-67=0, 
> test.topic-34=0, test.topic-1=0, test.topic-85=0, test.topic-60=0, 
> test.topic-27=0, test.topic-77=0, test.topic-52=0, test.topic-19=0, 
> test.topic-76=0, test.topic-43=0, test.topic-10=0, test.topic-93=0, 
> test.topic-68=0, test.topic-35=0, test.topic-2=0, test.topic-86=0, 
> test.topic-53=0, test.topic-28=0, test.topic-78=0, test.topic-45=0, 
> test.topic-20=0, test.topic-69=0, test.topic-44=0, test.topic-11=0, 
> test.topic-94=0, test.topic-61=0, test.topic-36=0, test.topic-3=0}
> {code}
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes`:
> {code:java}
> {test.topic-87=(timestamp=1511264434285, offset=289), 
> test.topic-54=(timestamp=1511265134993, offset=45420), 
> test.topic-21=(timestamp=1511265534207, offset=63643), 
> test.topic-79=(timestamp=1511270338275, offset=380750), 
> test.topic-46=(timestamp=1511266883588, offset=266379), 
> test.topic-13=(timestamp=1511265900538, offset=98512), 
> test.topic-70=(timestamp=1511266972452, offset=118522), 
> test.topic-37=(timestamp=1511264396370, offset=763), 
> test.topic-12=(timestamp=1511265504886, offset=61108), 
> test.topic-95=(timestamp=1511289492800, offset=847647), 
> test.topic-62=(timestamp=1511265831298, offset=68299), 
> test.topic-29=(timestamp=1511278767417, offset=548361), 
> test.topic-4=(timestamp=1511269316679, offset=144855), 
> test.topic-88=(timestamp=1511265608468, offset=107831), 
> test.topic-55=(timestamp=1511267449288, offset=129241), 
> test.topic-22=(timestamp=1511283134114, offset=563095), 
> test.topic-80=(timestamp=1511277334877, offset=534859), 
> test.topic-47=(timestamp=1511265530689, offset=71608), 
> test.topic-14=(timestamp=1511266308829, offset=80962), 
> 

[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2019-02-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765449#comment-16765449
 ] 

Matthias J. Sax commented on KAFKA-7304:


[~rsivaram] This is marked as "Critical" for 2.2 release but it seems there was 
no progress on the PR since September. What is the current status?

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Critical
> Fix For: 1.1.2, 2.2.0, 2.0.2
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot 
> 2018-08-29 at 10.50.47 AM.png, Screen Shot 2018-09-29 at 10.38.12 PM.png, 
> Screen Shot 2018-09-29 at 10.38.38 PM.png, Screen Shot 2018-09-29 at 8.34.50 
> PM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2019-02-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765463#comment-16765463
 ] 

Matthias J. Sax commented on KAFKA-7481:


[~hachikuji] The corresponding KIP of this Jira has status "under discussion" – 
if you don't object, I will move this out of 2.2.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7897) Invalid use of epoch cache with old message format versions

2019-02-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7897:
---
Priority: Blocker  (was: Major)

> Invalid use of epoch cache with old message format versions
> ---
>
> Key: KAFKA-7897
> URL: https://issues.apache.org/jira/browse/KAFKA-7897
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
>
> Message format downgrades are not supported, but they generally work as long 
> as broker/clients at least can continue to parse both message formats. After 
> a downgrade, the truncation logic should revert to using the high watermark, 
> but currently we use the existence of any cached epoch as the sole 
> prerequisite in order to leverage OffsetsForLeaderEpoch. This has the effect 
> of causing a massive truncation after startup which causes re-replication.
> I think our options to fix this are to either 1) clear the cache when we 
> notice a downgrade, or 2) forbid downgrades and raise an error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7897) Invalid use of epoch cache with old message format versions

2019-02-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7897:
---
Fix Version/s: 2.2.0

> Invalid use of epoch cache with old message format versions
> ---
>
> Key: KAFKA-7897
> URL: https://issues.apache.org/jira/browse/KAFKA-7897
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.0
>
>
> Message format downgrades are not supported, but they generally work as long 
> as broker/clients at least can continue to parse both message formats. After 
> a downgrade, the truncation logic should revert to using the high watermark, 
> but currently we use the existence of any cached epoch as the sole 
> prerequisite in order to leverage OffsetsForLeaderEpoch. This has the effect 
> of causing a massive truncation after startup which causes re-replication.
> I think our options to fix this are to either 1) clear the cache when we 
> notice a downgrade, or 2) forbid downgrades and raise an error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7915) SASL authentication failures may return sensitive data to client

2019-02-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765454#comment-16765454
 ] 

Matthias J. Sax commented on KAFKA-7915:


[~rsivaram] The corresponding PR was merged. Can this ticket be resolved?

> SASL authentication failures may return sensitive data to client
> 
>
> Key: KAFKA-7915
> URL: https://issues.apache.org/jira/browse/KAFKA-7915
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.2.0
>
>
> There was a regression from the commit 
> https://github.com/apache/kafka/commit/e8a3bc74254a8e4e4aaca41395177fa4a98b480c#diff-e4c812749f57c982e2570492657ea787
>  which added the error message from SaslException thrown by the server during 
> authentication into the error response returned to clients. Since this 
> exception may contain sensitive data (e.g. indicating that a user exists but 
> password match failed), we should not return the error to clients. We have a 
> separate exception (`AuthenticationException`) for errors that are safe to 
> propagate to clients.
> The regression was not in any released version, the related commit will only 
> be in 2.2.0, so we just need to fix this before 2.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7565) NPE in KafkaConsumer

2019-02-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765452#comment-16765452
 ] 

Matthias J. Sax commented on KAFKA-7565:


[~ijuma] [~hachikuji] This is marked as "Critical" for 2.2 release but it seems 
nobody picked it up. Comment? Should we remove fixed version and/or 
deprioritize?

> NPE in KafkaConsumer
> 
>
> Key: KAFKA-7565
> URL: https://issues.apache.org/jira/browse/KAFKA-7565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Alexey Vakhrenev
>Priority: Critical
> Fix For: 2.2.0
>
>
> The stacktrace is
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> {noformat}
> Couldn't find minimal reproducer, but it happens quite often in our system. 
> We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is 
> somehow related.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7915) SASL authentication failures may return sensitive data to client

2019-02-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765438#comment-16765438
 ] 

ASF GitHub Bot commented on KAFKA-7915:
---

rajinisivaram commented on pull request #6252: KAFKA-7915: Don't return 
sensitive authentication errors to clients
URL: https://github.com/apache/kafka/pull/6252
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SASL authentication failures may return sensitive data to client
> 
>
> Key: KAFKA-7915
> URL: https://issues.apache.org/jira/browse/KAFKA-7915
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.2.0
>
>
> There was a regression from the commit 
> https://github.com/apache/kafka/commit/e8a3bc74254a8e4e4aaca41395177fa4a98b480c#diff-e4c812749f57c982e2570492657ea787
>  which added the error message from SaslException thrown by the server during 
> authentication into the error response returned to clients. Since this 
> exception may contain sensitive data (e.g. indicating that a user exists but 
> password match failed), we should not return the error to clients. We have a 
> separate exception (`AuthenticationException`) for errors that are safe to 
> propagate to clients.
> The regression was not in any released version, the related commit will only 
> be in 2.2.0, so we just need to fix this before 2.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot

2019-02-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765447#comment-16765447
 ] 

Matthias J. Sax commented on KAFKA-3955:


[~dhruvilshah] This is marked as "Critical" for 2.2 release but it seems there 
was no progress on the PR since October. What is the current status?

> Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to 
> failed broker boot
> 
>
> Key: KAFKA-3955
> URL: https://issues.apache.org/jira/browse/KAFKA-3955
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.11.0.0, 1.0.0
>Reporter: Tom Crayford
>Assignee: Dhruvil Shah
>Priority: Critical
>  Labels: reliability
> Fix For: 2.2.0
>
>
> Hi,
> I've found a bug impacting kafka brokers on startup after an unclean 
> shutdown. If a log segment is corrupt and has non-monotonic offsets (see the 
> appendix of this bug for a sample output from {{DumpLogSegments}}), then 
> {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218
> That code is called by {{LogSegment.recover}}: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191
> Which is called in several places in {{Log.scala}}. Notably it's called four 
> times during recovery:
> Thrice in Log.loadSegments
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226
> and once in Log.recoverLog
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268
> Of these, only the very last one has a {{catch}} for 
> {{InvalidOffsetException}}. When that catches the issue, it truncates the 
> whole log (not just this segment): 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
>  to the start segment of the bad log segment.
> However, this code can't be hit on recovery, because of the code paths in 
> {{loadSegments}} - they mean we'll never hit truncation here, as we always 
> throw this exception and that goes all the way to the toplevel exception 
> handler and crashes the JVM.
> As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
> this is to move this crash recovery/truncate code inside a new method in 
> {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
> That code should return the number of {{truncatedBytes}} like we do in 
> {{Log.recoverLog}} and then truncate the log. The callers will have to be 
> notified "stop iterating over files in the directory", likely via a return 
> value of {{truncatedBytes}} like {{Log.recoverLog` does right now.
> I'm happy working on a patch for this. I'm aware this recovery code is tricky 
> and important to get right.
> I'm also curious (and currently don't have good theories as of yet) as to how 
> this log segment got into this state with non-monotonic offsets. This segment 
> is using gzip compression, and is under 0.9.0.1. The same bug with respect to 
> recovery exists in trunk, but I'm unsure if the new handling around 
> compressed messages (KIP-31) means the bug where non-monotonic offsets get 
> appended is still present in trunk.
> As a production workaround, one can manually truncate that log folder 
> yourself (delete all .index/.log files including and after the one with the 
> bad offset). However, kafka should (and can) handle this case well - with 
> replication we can truncate in broker startup.
> stacktrace and error message:
> {code}
> pri=WARN  t=pool-3-thread-4 at=Log Found a corrupted index file, 
> /$DIRECTORY/$TOPIC-22/14306536.index, deleting and rebuilding 
> index...
> pri=ERROR t=main at=LogManager There was an error in one of the threads 
> during logs loading: kafka.common.InvalidOffsetException: Attempt to append 
> an offset (15000337) to position 111719 no larger than the last offset 
> appended (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. 
> Prepare to shutdown kafka.common.InvalidOffsetException: Attempt to append an 
> offset (15000337) to position 111719 no larger than the last offset appended 
> (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at 
> 

[jira] [Resolved] (KAFKA-7915) SASL authentication failures may return sensitive data to client

2019-02-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7915.
---
Resolution: Fixed

> SASL authentication failures may return sensitive data to client
> 
>
> Key: KAFKA-7915
> URL: https://issues.apache.org/jira/browse/KAFKA-7915
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.2.0
>
>
> There was a regression from the commit 
> https://github.com/apache/kafka/commit/e8a3bc74254a8e4e4aaca41395177fa4a98b480c#diff-e4c812749f57c982e2570492657ea787
>  which added the error message from SaslException thrown by the server during 
> authentication into the error response returned to clients. Since this 
> exception may contain sensitive data (e.g. indicating that a user exists but 
> password match failed), we should not return the error to clients. We have a 
> separate exception (`AuthenticationException`) for errors that are safe to 
> propagate to clients.
> The regression was not in any released version, the related commit will only 
> be in 2.2.0, so we just need to fix this before 2.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread John Roesler (JIRA)
John Roesler created KAFKA-7918:
---

 Summary: Streams store cleanup: inline byte-store generic 
parameters
 Key: KAFKA-7918
 URL: https://issues.apache.org/jira/browse/KAFKA-7918
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, the fundamental layer of stores in Streams is the "bytes store".

The easiest way to identify this is in `org.apache.kafka.streams.state.Stores`, 
all the `StoreBuilder`s require a `XXBytesStoreSupplier`. 

We provide several implementations of these bytes stores, typically an 
in-memory one and a persistent one (aka RocksDB).

Inside these bytes stores, the key is always `Bytes` and the value is always 
`byte[]` (serialization happens at a higher level). However, the store 
implementations are generically typed, just `K` and `V`.

This is good for flexibility, but it makes the code a little harder to 
understand. I think that we used to do serialization at a lower level, so the 
generics are a hold-over from that.

It would simplify the code if we just inlined the actual k/v types and maybe 
even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
`InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765430#comment-16765430
 ] 

Matthias J. Sax commented on KAFKA-7917:


[~vvcephei] Before we do this, we should quantify the overhead to see if it's 
worth to do. Atm, I would not expect this to have mayor performance impact. To 
improve throughput, we should try to work on the "big junks" first, before 
delving into optimizations like this.

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7916) Streams store cleanup: unify wrapping

2019-02-11 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7916:

Component/s: streams

> Streams store cleanup: unify wrapping
> -
>
> Key: KAFKA-7916
> URL: https://issues.apache.org/jira/browse/KAFKA-7916
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> The internal store handling in Streams has become quite complex, with many 
> layers of wrapping for different bookeeping operations.
> The first thing we can do about this is to at least unify the wrapping 
> strategy, such that *all* store wrappers extend WrappedStateStore. This would 
> make the code easier to understand, since all wrappers would have the same 
> basic shape.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-11 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7917:

Component/s: streams

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-11 Thread John Roesler (JIRA)
John Roesler created KAFKA-7917:
---

 Summary: Streams store cleanup: collapse layers
 Key: KAFKA-7917
 URL: https://issues.apache.org/jira/browse/KAFKA-7917
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Following on KAFKA-7916, we can consider collapsing the "streams management 
layers" into one.

Right now, we have:
 * metering (also handles moving from pojo world to bytes world)
 * change-logging
 * caching

This is good compositional style, but we also have some runtime overhead of 
calling through all these layers, as well as some mental overhead of 
understanding how many and which layers we are going through.

Also, there are dependencies between the caching and change-logging layers.

I _think_ it would simplify the code if we collapsed these into one layer with 
boolean switches to turn on or off the different aspects. (rather than wrapping 
the store with the different layers or not depending on the same boolean 
conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7916) Streams store cleanup: unify wrapping

2019-02-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765301#comment-16765301
 ] 

ASF GitHub Bot commented on KAFKA-7916:
---

vvcephei commented on pull request #6255: KAFKA-7916: Unify store wrapping code 
for clarity
URL: https://github.com/apache/kafka/pull/6255
 
 
   Refactor internal store wrapping for improved maintainability.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams store cleanup: unify wrapping
> -
>
> Key: KAFKA-7916
> URL: https://issues.apache.org/jira/browse/KAFKA-7916
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> The internal store handling in Streams has become quite complex, with many 
> layers of wrapping for different bookeeping operations.
> The first thing we can do about this is to at least unify the wrapping 
> strategy, such that *all* store wrappers extend WrappedStateStore. This would 
> make the code easier to understand, since all wrappers would have the same 
> basic shape.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7916) Streams store cleanup: unify wrapping

2019-02-11 Thread John Roesler (JIRA)
John Roesler created KAFKA-7916:
---

 Summary: Streams store cleanup: unify wrapping
 Key: KAFKA-7916
 URL: https://issues.apache.org/jira/browse/KAFKA-7916
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler
Assignee: John Roesler


The internal store handling in Streams has become quite complex, with many 
layers of wrapping for different bookeeping operations.

The first thing we can do about this is to at least unify the wrapping 
strategy, such that *all* store wrappers extend WrappedStateStore. This would 
make the code easier to understand, since all wrappers would have the same 
basic shape.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7741) Bad dependency via SBT

2019-02-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765245#comment-16765245
 ] 

ASF GitHub Bot commented on KAFKA-7741:
---

guozhangwang commented on pull request #6207: KAFKA-7741: Reword Streams 
dependency workaround docs
URL: https://github.com/apache/kafka/pull/6207
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bad dependency via SBT
> --
>
> Key: KAFKA-7741
> URL: https://issues.apache.org/jira/browse/KAFKA-7741
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1, 2.1.0
> Environment: Windows 10 professional, IntelliJ IDEA 2017.1
>Reporter: sacha barber
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> I am using the Kafka-Streams-Scala 2.1.0 JAR.
> And if I create a new Scala project using SBT with these dependencies 
> {code}
> name := "ScalaKafkaStreamsDemo"
> version := "1.0"
> scalaVersion := "2.12.1"
> libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
> libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
> //TEST
> libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
> libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % 
> "2.0.0" % Test
> {code}
> I get this error
>  
> {code}
> SBT 'ScalaKafkaStreamsDemo' project refresh failed
> Error:Error while importing SBT project:...[info] Resolving 
> jline#jline;2.14.1 ...
> [warn] [FAILED ] 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}: (0ms)
> [warn]  local: tried
> [warn] 
> C:\Users\sacha\.ivy2\local\javax.ws.rs\javax.ws.rs-api\2.1.1\${packaging.type}s\javax.ws.rs-api.${packaging.type}
> [warn]  public: tried
> [warn] 
> https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.${packaging.type}
> [info] downloading 
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-test-utils/2.1.0/kafka-streams-test-utils-2.1.0.jar
>  ...
> [info] [SUCCESSFUL ] 
> org.apache.kafka#kafka-streams-test-utils;2.1.0!kafka-streams-test-utils.jar 
> (344ms)
> [warn] ::
> [warn] :: FAILED DOWNLOADS ::
> [warn] :: ^ see resolution messages for details ^ ::
> [warn] ::
> [warn] :: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [warn] ::
> [trace] Stack trace suppressed: run 'last *:ssExtractDependencies' for the 
> full output.
> [trace] Stack trace suppressed: run 'last *:update' for the full output.
> [error] (*:ssExtractDependencies) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] (*:update) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] Total time: 8 s, completed 16-Dec-2018 19:27:21
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; 
> support was removed in 8.0See complete log in  href="file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log">file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log
> {code}
> This seems to be a common issue with bad dependency from Kafka to 
> javax.ws.rs-api.
> if I drop the Kafka version down to 2.0.0 and add this line to my SBT file 
> this error goes away
> {code}
> libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" 
> artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))`
> {code}
>  
> However I would like to work with 2.1.0 version.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7741) Bad dependency via SBT

2019-02-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765241#comment-16765241
 ] 

ASF GitHub Bot commented on KAFKA-7741:
---

guozhangwang commented on pull request #6126: KAFKA-7741: streams-scala - 
document dependency workaround
URL: https://github.com/apache/kafka/pull/6126
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bad dependency via SBT
> --
>
> Key: KAFKA-7741
> URL: https://issues.apache.org/jira/browse/KAFKA-7741
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1, 2.1.0
> Environment: Windows 10 professional, IntelliJ IDEA 2017.1
>Reporter: sacha barber
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> I am using the Kafka-Streams-Scala 2.1.0 JAR.
> And if I create a new Scala project using SBT with these dependencies 
> {code}
> name := "ScalaKafkaStreamsDemo"
> version := "1.0"
> scalaVersion := "2.12.1"
> libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
> libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
> //TEST
> libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
> libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % 
> "2.0.0" % Test
> {code}
> I get this error
>  
> {code}
> SBT 'ScalaKafkaStreamsDemo' project refresh failed
> Error:Error while importing SBT project:...[info] Resolving 
> jline#jline;2.14.1 ...
> [warn] [FAILED ] 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}: (0ms)
> [warn]  local: tried
> [warn] 
> C:\Users\sacha\.ivy2\local\javax.ws.rs\javax.ws.rs-api\2.1.1\${packaging.type}s\javax.ws.rs-api.${packaging.type}
> [warn]  public: tried
> [warn] 
> https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.${packaging.type}
> [info] downloading 
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-test-utils/2.1.0/kafka-streams-test-utils-2.1.0.jar
>  ...
> [info] [SUCCESSFUL ] 
> org.apache.kafka#kafka-streams-test-utils;2.1.0!kafka-streams-test-utils.jar 
> (344ms)
> [warn] ::
> [warn] :: FAILED DOWNLOADS ::
> [warn] :: ^ see resolution messages for details ^ ::
> [warn] ::
> [warn] :: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [warn] ::
> [trace] Stack trace suppressed: run 'last *:ssExtractDependencies' for the 
> full output.
> [trace] Stack trace suppressed: run 'last *:update' for the full output.
> [error] (*:ssExtractDependencies) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] (*:update) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] Total time: 8 s, completed 16-Dec-2018 19:27:21
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; 
> support was removed in 8.0See complete log in  href="file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log">file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log
> {code}
> This seems to be a common issue with bad dependency from Kafka to 
> javax.ws.rs-api.
> if I drop the Kafka version down to 2.0.0 and add this line to my SBT file 
> this error goes away
> {code}
> libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" 
> artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))`
> {code}
>  
> However I would like to work with 2.1.0 version.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7869) Refactor RocksDBConfigSetter API to separate DBOptions and CFOptions

2019-02-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765236#comment-16765236
 ] 

Guozhang Wang commented on KAFKA-7869:
--

Whenever we upgrade to new versions of rocksDB that has new APIs to CFOptions 
and DBOptions, we need to make code changes to 
{{RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter}}. On the other 
hand, users who get to leverage {{RocksDBConfigSetter}} would be sorta experts 
of rocksDB so they are aware of setting it on CF / DB options levels anyways. 
So I feel it is okay to push some of these burdens to the users so that we do 
not need to maintain the correspondence internally.

> Refactor RocksDBConfigSetter API to separate DBOptions and CFOptions
> 
>
> Key: KAFKA-7869
> URL: https://issues.apache.org/jira/browse/KAFKA-7869
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> Current RocksDBConfigSetter has the following API:
> {code}
> void setConfig(final String storeName, final Options options, final 
> Map configs);
> {code}
> Where `Options` contains configurations for both db-level and cf-level of 
> RocksDB.
> As we move on to have multiple CFs following KIP-258, it's better to refactor 
> it into
> {code}
> void setConfig(final String storeName, final DBOptions dbOptions, final 
> ColumnFamilyOptions cfOptions, final Map configs);
> {code}
> And then inside the internal implementation, if only the default CF is used, 
> we can still use the other constructor of `Options` that takes both a 
> DBOptions and CFOptions object as parameters.
> This should be started only after KIP-258 is finished.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7897) Invalid use of epoch cache with old message format versions

2019-02-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765215#comment-16765215
 ] 

ASF GitHub Bot commented on KAFKA-7897:
---

hachikuji commented on pull request #6253: KAFKA-7897; Do not write epoch start 
offset for older message format versions
URL: https://github.com/apache/kafka/pull/6253
 
 
   When an older message format is in use, we should disable the leader epoch 
cache so that we resort to truncation by high watermark. Previously we updated 
the cache for all versions when a broker became leader for a partition. This 
can cause large and unnecessary truncations after leader changes because we 
relied on the presence of _any_ cached epoch in order to tell whether to use 
the improved truncation logic possible with the OffsetsForLeaderEpoch API.
   
   Note this is a simplified fix than what was merged to trunk in #6232 since 
the branches have diverged significantly. Rather than removing the epoch cache 
file, we guard usage of the cache with the record version.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Invalid use of epoch cache with old message format versions
> ---
>
> Key: KAFKA-7897
> URL: https://issues.apache.org/jira/browse/KAFKA-7897
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Message format downgrades are not supported, but they generally work as long 
> as broker/clients at least can continue to parse both message formats. After 
> a downgrade, the truncation logic should revert to using the high watermark, 
> but currently we use the existence of any cached epoch as the sole 
> prerequisite in order to leverage OffsetsForLeaderEpoch. This has the effect 
> of causing a massive truncation after startup which causes re-replication.
> I think our options to fix this are to either 1) clear the cache when we 
> notice a downgrade, or 2) forbid downgrades and raise an error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-11 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765200#comment-16765200
 ] 

John Roesler commented on KAFKA-7882:
-

Aha, thanks, [~bbejeck]! That's probably the reason for the "stores frequently 
closed" behavior.

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-11 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765196#comment-16765196
 ] 

Bill Bejeck commented on KAFKA-7882:


[~nijo]

One other thing to consider the Transform method expects a TransformSupplier 
which implies a new Transformer instance for each stream thread topology.  I 
haven't seen your code at all but from the example above
{noformat}
builder.addStateStore(stateStore)
(...).transform(new MyTransformer(...), "MyStore"){noformat}
looks to me like you may be supplying a single Transformer instance across all 
streams threads.

 

-Bill

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7914) LDAP

2019-02-11 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765143#comment-16765143
 ] 

Sönke Liebau commented on KAFKA-7914:
-

Hi [~Thatboix45] you probably accidentally created this ticket while you were 
still in the process of adding information?

As it stands I am afraid it is a bit light on details ;)

> LDAP
> 
>
> Key: KAFKA-7914
> URL: https://issues.apache.org/jira/browse/KAFKA-7914
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chris Bogan
>Priority: Major
>
> Entry



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-11 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765139#comment-16765139
 ] 

John Roesler commented on KAFKA-7882:
-

Hi [~nijo],

 

Thanks for bumping the post. I'd missed your comments.

 

You shouldn't have access to the `BufferFullStrategy` enum. It's an internal 
class, which is why there's no javadoc. Are you using the public interface 
(`Suppressed`) builders? If you stick with creating the buffers using 
`Suppressed.BufferConfig`, there should sufficient documentation. Please let me 
know if it needs improvement.

 

The spill-to-disk strategy is not currently implemented. With the current 
(in-memory) buffer, the app would have to stop if you run out of memory. The 
other option is to choose `Suppressed.BufferConfig#emitEarlyWhenFull`, in which 
case, if you do run out of configured buffer space, it'll emit the 
longest-buffered record to make room. I am currently queuing up the disk-based 
buffering implementation.

 

Offhand, if you were previously using wall-clock time, you don't need 
semantically airtight suppression behavior, so, emitting early when the buffer 
fills up should be fine.

 

For testing, you might take a look at 
`org.apache.kafka.streams.kstream.internals.SuppressScenarioTest`, which also 
uses the test driver. Basically, stream time is the highest timestamp yet seen 
in a record, so you can advance stream time by inserting "dummy" records with 
higher timestamps.

 

Regarding wall-clock time, someone has proposed a KIP to add support for it to 
suppressed. 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time]
 . Hopefully, it will be in the next release.

 

Thanks,

-John

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7915) SASL authentication failures may return sensitive data to client

2019-02-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7915:
--
Description: 
There was a regression from the commit 
https://github.com/apache/kafka/commit/e8a3bc74254a8e4e4aaca41395177fa4a98b480c#diff-e4c812749f57c982e2570492657ea787
 which added the error message from SaslException thrown by the server during 
authentication into the error response returned to clients. Since this 
exception may contain sensitive data (e.g. indicating that a user exists but 
password match failed), we should not return the error to clients. We have a 
separate exception (`AuthenticationException`) for errors that are safe to 
propagate to clients.

The regression was not in any released version, the related commit will only be 
in 2.2.0, so we just need to fix this before 2.2.0.

  was:There was a regression from the commit 
https://github.com/apache/kafka/commit/e8a3bc74254a8e4e4aaca41395177fa4a98b480c#diff-e4c812749f57c982e2570492657ea787
 which added the error message from SaslException thrown by the server during 
authentication into the error response returned to clients. Since this 
exception may contain sensitive data (e.g. indicating that a user exists but 
password match failed), we should not return the error to clients. We have a 
separate exception (`AuthenticationException`) for errors that are safe to 
propagate to clients.


> SASL authentication failures may return sensitive data to client
> 
>
> Key: KAFKA-7915
> URL: https://issues.apache.org/jira/browse/KAFKA-7915
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.2.0
>
>
> There was a regression from the commit 
> https://github.com/apache/kafka/commit/e8a3bc74254a8e4e4aaca41395177fa4a98b480c#diff-e4c812749f57c982e2570492657ea787
>  which added the error message from SaslException thrown by the server during 
> authentication into the error response returned to clients. Since this 
> exception may contain sensitive data (e.g. indicating that a user exists but 
> password match failed), we should not return the error to clients. We have a 
> separate exception (`AuthenticationException`) for errors that are safe to 
> propagate to clients.
> The regression was not in any released version, the related commit will only 
> be in 2.2.0, so we just need to fix this before 2.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7915) SASL authentication failures may return sensitive data to client

2019-02-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765089#comment-16765089
 ] 

ASF GitHub Bot commented on KAFKA-7915:
---

rajinisivaram commented on pull request #6252: KAFKA-7915: Don't return 
sensitive authentication errors to clients
URL: https://github.com/apache/kafka/pull/6252
 
 
   Don't return error messages from `SaslException` to clients. Error messages 
to be returned to clients to aid debugging must be thrown as 
AuthenticationExceptions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SASL authentication failures may return sensitive data to client
> 
>
> Key: KAFKA-7915
> URL: https://issues.apache.org/jira/browse/KAFKA-7915
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.2.0
>
>
> There was a regression from the commit 
> https://github.com/apache/kafka/commit/e8a3bc74254a8e4e4aaca41395177fa4a98b480c#diff-e4c812749f57c982e2570492657ea787
>  which added the error message from SaslException thrown by the server during 
> authentication into the error response returned to clients. Since this 
> exception may contain sensitive data (e.g. indicating that a user exists but 
> password match failed), we should not return the error to clients. We have a 
> separate exception (`AuthenticationException`) for errors that are safe to 
> propagate to clients.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7915) SASL authentication failures may return sensitive data to client

2019-02-11 Thread Ron Dagostino (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765088#comment-16765088
 ] 

Ron Dagostino commented on KAFKA-7915:
--

Is this the offending line?

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L471



> SASL authentication failures may return sensitive data to client
> 
>
> Key: KAFKA-7915
> URL: https://issues.apache.org/jira/browse/KAFKA-7915
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.2.0
>
>
> There was a regression from the commit 
> https://github.com/apache/kafka/commit/e8a3bc74254a8e4e4aaca41395177fa4a98b480c#diff-e4c812749f57c982e2570492657ea787
>  which added the error message from SaslException thrown by the server during 
> authentication into the error response returned to clients. Since this 
> exception may contain sensitive data (e.g. indicating that a user exists but 
> password match failed), we should not return the error to clients. We have a 
> separate exception (`AuthenticationException`) for errors that are safe to 
> propagate to clients.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7915) SASL authentication failures may return sensitive data to client

2019-02-11 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7915:
-

 Summary: SASL authentication failures may return sensitive data to 
client
 Key: KAFKA-7915
 URL: https://issues.apache.org/jira/browse/KAFKA-7915
 Project: Kafka
  Issue Type: Bug
  Components: security
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.2.0


There was a regression from the commit 
https://github.com/apache/kafka/commit/e8a3bc74254a8e4e4aaca41395177fa4a98b480c#diff-e4c812749f57c982e2570492657ea787
 which added the error message from SaslException thrown by the server during 
authentication into the error response returned to clients. Since this 
exception may contain sensitive data (e.g. indicating that a user exists but 
password match failed), we should not return the error to clients. We have a 
separate exception (`AuthenticationException`) for errors that are safe to 
propagate to clients.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-11 Thread Kartik (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik updated KAFKA-7794:
--
Attachment: image-2019-02-11-20-56-13-362.png

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-11 Thread Kartik (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik updated KAFKA-7794:
--
Attachment: image-2019-02-11-20-57-03-579.png

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-11 Thread Kartik (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765075#comment-16765075
 ] 

Kartik commented on KAFKA-7794:
---

[~audhumla] I tried your steps and I was able to get the offset properly. The 
offset won't be visible if you are giving the timestamp > recent record added 
timestamp.

 

!image-2019-02-11-20-51-07-805.png!

 

If I provide the timestamp > 1549897929598 like '1549897929599' you won't get 
the offset. 

!image-2019-02-11-20-57-03-579.png!

 

 

When I provide the proper timestamp I get the proper offset

!image-2019-02-11-20-56-13-362.png!

 

 

Let me know If I am wrong.

 

 

 

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-11 Thread Kartik (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik updated KAFKA-7794:
--
Attachment: image-2019-02-11-20-51-07-805.png

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7913) Kafka broker halts and messes up the whole cluster

2019-02-11 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765007#comment-16765007
 ] 

Christoffer Hammarström commented on KAFKA-7913:


This is bug KAFKA-7697

> Kafka broker halts and messes up the whole cluster
> --
>
> Key: KAFKA-7913
> URL: https://issues.apache.org/jira/browse/KAFKA-7913
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
> Environment: kafka_2.12-2.1.0, 
> openjdk version "11.0.1" 2018-10-16 LTS
> OpenJDK Runtime Environment 18.9 (build 11.0.1+13-LTS),
> CentOS Linux release 7.3.1611 (Core),
> linux 3.10.0-514.26.2.el7.x86_64
>Reporter: Andrej Urvantsev
>Priority: Major
>
> We upgraded cluster recently and running kafka 2.1.0 on java 11.
> For a time being everything went ok, but then random brokers started to halt 
> from time to time.
> When it happens the broker still looks alive to other brokers, but it stops 
> to receive network traffic. Other brokers then throw IOException:
> {noformat}
> java.io.IOException: Connection to 36155 was disconnected before the response 
> was read
>     at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
>     at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
>     at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
>     at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:241)
>     at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
>     at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
>     at scala.Option.foreach(Option.scala:257)
>     at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
>     at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {noformat}
> On the problematic broker all logging stops. No errors, no exceptions - 
> nothing.
> This also "breaks" all cluster - since clients and other brokers "think" that 
> broker is still alive,
> they are trying to connect to it and it seems that leader election leaves 
> problematic brokers as a leader.
>  
> I would be glad to provide any further details if somebody could give an 
> advice what to investigate when it happens next time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-11 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764999#comment-16764999
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

Sorry for bumping post, but any advice on creating test case for topology 
containing `.suppress` method would be very helpful :)

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled

2019-02-11 Thread Michal Turek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764997#comment-16764997
 ] 

Michal Turek commented on KAFKA-3450:
-

Hi [~ijuma], is the question for me as the bug creator?

We don't want to let the client applications to create the topics at their 
will, so we have disabled this possibility using 
auto.create.topics.enable=false. The creation of the topic is not solution for 
us because it's exactly the thing we want to prevent.

Kafka brokers know about this (their) configuration but they are unable to 
propagate this information to the clients so they stuck for extremely long time 
(minute) in a loop of timeouts and failures. They block the caller's thread 
(async but blocking api) and are fully unable to communicate after the used 
thread pool become exhausted. Kafka broker responds "topic doesn't exist" to 
the client but it should return "topic doesn't exist and it will never be 
available" to quickly break the unnecessary loop of retries.

> Producer blocks on send to topic that doesn't exist if auto create is disabled
> --
>
> Key: KAFKA-3450
> URL: https://issues.apache.org/jira/browse/KAFKA-3450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1
>Reporter: Michal Turek
>Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if 
> the destination topic doesn't exist and if their automatic creation is 
> disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is 
> logged every 100 ms in a loop until the 60 seconds timeout expires, but the 
> operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
>  * Test of sending to a topic that does not exist while automatic creation of 
> topics is disabled in Kafka (auto.create.topics.enable=false).
>  */
> public class NoSuchTopicTest {
> private static final Logger LOGGER = 
> LoggerFactory.getLogger(NoSuchTopicTest.class);
> public static void main(String[] args) {
> Properties properties = new Properties();
> properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
> properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> NoSuchTopicTest.class.getSimpleName());
> properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); 
> // Default is 60 seconds
> try (Producer producer = new 
> KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> }
> }
> }
> {noformat}
> Related output
> {noformat}
> 2016-03-23 12:44:37.725 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
> Sending message (NoSuchTopicTest.java:26)
> 2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 0 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 1 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 2 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient 
> 

[jira] [Comment Edited] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-11 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761758#comment-16761758
 ] 

Mateusz Owczarek edited comment on KAFKA-7882 at 2/11/19 2:16 PM:
--

Also, I noticed my regression tests checking if events are actually produced 
after the time-window has finished are not passing. They are based on the 
`TopologyTestDriver` with the `advanceWallClockTime` since my previous suppress 
impl. was based on wall clock time punctuation. How can I test my topology now? 
Basically how can I test any topology containing `.suppress` method from Kafka 
2.1.0?


was (Author: nijo):
Also, I noticed my regression tests checking if events are actually produced 
after the time-window has finished are not passing. They are based on the 
`TopologyTestDriver` with the `advanceWallClockTime` since my previous suppress 
impl. was based on wall clock time punctuation. How can I test my topology now?

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type

2019-02-11 Thread Chris Bogan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764927#comment-16764927
 ] 

Chris Bogan commented on KAFKA-7681:


I'm trying to join gropg sorry a Little confused

> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7914) LDAP

2019-02-11 Thread Chris Bogan (JIRA)
Chris Bogan created KAFKA-7914:
--

 Summary: LDAP
 Key: KAFKA-7914
 URL: https://issues.apache.org/jira/browse/KAFKA-7914
 Project: Kafka
  Issue Type: Improvement
Reporter: Chris Bogan


Entry



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled

2019-02-11 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764852#comment-16764852
 ] 

Ismael Juma commented on KAFKA-3450:


It would be useful to understand what is the desired behaviour here. Generally, 
if you don't expect the topic to be there, `AdminClient` can be used to create 
the topic if it doesn't exist.

> Producer blocks on send to topic that doesn't exist if auto create is disabled
> --
>
> Key: KAFKA-3450
> URL: https://issues.apache.org/jira/browse/KAFKA-3450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1
>Reporter: Michal Turek
>Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if 
> the destination topic doesn't exist and if their automatic creation is 
> disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is 
> logged every 100 ms in a loop until the 60 seconds timeout expires, but the 
> operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
>  * Test of sending to a topic that does not exist while automatic creation of 
> topics is disabled in Kafka (auto.create.topics.enable=false).
>  */
> public class NoSuchTopicTest {
> private static final Logger LOGGER = 
> LoggerFactory.getLogger(NoSuchTopicTest.class);
> public static void main(String[] args) {
> Properties properties = new Properties();
> properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
> properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> NoSuchTopicTest.class.getSimpleName());
> properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); 
> // Default is 60 seconds
> try (Producer producer = new 
> KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> }
> }
> }
> {noformat}
> Related output
> {noformat}
> 2016-03-23 12:44:37.725 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
> Sending message (NoSuchTopicTest.java:26)
> 2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 0 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 1 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 2 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 3 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.231 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 4 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.332 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 5 : 
> 

[jira] [Commented] (KAFKA-3522) Consider adding version information into rocksDB storage format

2019-02-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764784#comment-16764784
 ] 

ASF GitHub Bot commented on KAFKA-3522:
---

mjsax commented on pull request #6222: KAFKA-3522: Remove TimestampedByteStore 
from public API
URL: https://github.com/apache/kafka/pull/6222
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consider adding version information into rocksDB storage format
> ---
>
> Key: KAFKA-3522
> URL: https://issues.apache.org/jira/browse/KAFKA-3522
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: architecture
>
> Kafka Streams does not introduce any modifications to the data format in the 
> underlying Kafka protocol, but it does use RocksDB for persistent state 
> storage, and currently its data format is fixed and hard-coded. We want to 
> consider the evolution path in the future we we change the data format, and 
> hence having some version info stored along with the storage file / directory 
> would be useful.
> And this information could be even out of the storage file; for example, we 
> can just use a small "version indicator" file in the rocksdb directory for 
> this purposes. Thoughts? [~enothereska] [~jkreps]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6304) The controller should allow updating the partition reassignment for the partitions being reassigned

2019-02-11 Thread Victor Corral (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764771#comment-16764771
 ] 

Victor Corral commented on KAFKA-6304:
--

I started building ESXI to be compatible with camp nav vcware to work with my 
hyperv. all was well until I went to make the disk. Windows 10 doesn't work so 
I'm downloading Windows 7

> The controller should allow updating the partition reassignment for the 
> partitions being reassigned
> ---
>
> Key: KAFKA-6304
> URL: https://issues.apache.org/jira/browse/KAFKA-6304
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 2.2.0
>
>
> Currently the controller will not process the partition reassignment of a 
> partition if the partition is already being reassigned.
> The issue is that if there is a broker failure during the partition 
> reassignment, the partition reassignment may never finish. And the users may 
> want to cancel the partition reassignment. However, the controller will 
> refuse to do that unless user manually deletes the partition reassignment zk 
> path, force a controller switch and then issue the revert command. This is 
> pretty involved. It seems reasonable for the controller to replace the 
> ongoing stuck reassignment and replace it with the updated partition 
> assignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6304) The controller should allow updating the partition reassignment for the partitions being reassigned

2019-02-11 Thread Victor Corral (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764773#comment-16764773
 ] 

Victor Corral commented on KAFKA-6304:
--

Then I noticed it I typed in credit /set TESTMODE /? I Have a little 
configuring to do so I moved it to something else to configure my domain. ALL 
DAY TODAY I dedicated my day to finding or setting up my domain and I ended up 
finding out that the T-Mobile  cell spot router I was using I found in the 
trash so I will have a new one in 3 days and they even took care of the 
delivery for me so I hope hurricane electric and google understand

> The controller should allow updating the partition reassignment for the 
> partitions being reassigned
> ---
>
> Key: KAFKA-6304
> URL: https://issues.apache.org/jira/browse/KAFKA-6304
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 2.2.0
>
>
> Currently the controller will not process the partition reassignment of a 
> partition if the partition is already being reassigned.
> The issue is that if there is a broker failure during the partition 
> reassignment, the partition reassignment may never finish. And the users may 
> want to cancel the partition reassignment. However, the controller will 
> refuse to do that unless user manually deletes the partition reassignment zk 
> path, force a controller switch and then issue the revert command. This is 
> pretty involved. It seems reasonable for the controller to replace the 
> ongoing stuck reassignment and replace it with the updated partition 
> assignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)