[jira] [Assigned] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling

2018-12-12 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-7728:
--

Assignee: Mayuresh Gharat

> Add JoinReason to the join group request for better rebalance handling
> --
>
> Key: KAFKA-7728
> URL: https://issues.apache.org/jira/browse/KAFKA-7728
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: consumer, mirror-maker, needs-kip
>
> Recently [~mgharat] and I discussed about the current rebalance logic on 
> leader join group request handling. So far we blindly trigger rebalance when 
> the leader rejoins. The caveat is that KIP-345 is not covering this effort 
> and if a consumer group is not using sticky assignment but using other 
> strategy like round robin, the redundant rebalance could still shuffle the 
> topic partitions around consumers. (for example mirror maker application)
> I checked on broker side and here is what we currently do:
>  
> {code:java}
> if (group.isLeader(memberId) || !member.matches(protocols))  
> // force a rebalance if a member has changed metadata or if the leader sends 
> JoinGroup. 
> // The latter allows the leader to trigger rebalances for changes affecting 
> assignment 
> // which do not affect the member metadata (such as topic metadata changes 
> for the consumer) {code}
> Based on the broker logic, we only need to trigger rebalance for leader 
> rejoin when the topic metadata change has happened. I also looked up the 
> ConsumerCoordinator code on client side, and found out the metadata 
> monitoring logic here:
> {code:java}
> public boolean rejoinNeededOrPending() {
> ...
> // we need to rejoin if we performed the assignment and metadata has changed
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.equals(metadataSnapshot))
>   return true;
> }{code}
>  I guess instead of just returning true, we could introduce a new enum field 
> called JoinReason which could indicate the purpose of the rejoin. Thus we 
> don't need to do a full rebalance when the leader is just in rolling bounce.
> We could utilize this information I guess. Just add another enum field into 
> the join group request called JoinReason so that we know whether leader is 
> rejoining due to topic metadata change. If yes, we trigger rebalance 
> obviously; if no, we shouldn't trigger rebalance.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling

2018-12-12 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7728:
--

 Summary: Add JoinReason to the join group request for better 
rebalance handling
 Key: KAFKA-7728
 URL: https://issues.apache.org/jira/browse/KAFKA-7728
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


Recently [~mgharat] and I discussed about the current rebalance logic on leader 
join group request handling. So far we blindly trigger rebalance when the 
leader rejoins. The caveat is that KIP-345 is not covering this effort and if a 
consumer group is not using sticky assignment but using other strategy like 
round robin, the redundant rebalance could still shuffle the topic partitions 
around consumers. (for example mirror maker application)

I checked on broker side and here is what we currently do:

 
{code:java}
if (group.isLeader(memberId) || !member.matches(protocols))  
// force a rebalance if a member has changed metadata or if the leader sends 
JoinGroup. 
// The latter allows the leader to trigger rebalances for changes affecting 
assignment 
// which do not affect the member metadata (such as topic metadata changes for 
the consumer) {code}
Based on the broker logic, we only need to trigger rebalance for leader rejoin 
when the topic metadata change has happened. I also looked up the 
ConsumerCoordinator code on client side, and found out the metadata monitoring 
logic here:
{code:java}
public boolean rejoinNeededOrPending() {
...
// we need to rejoin if we performed the assignment and metadata has changed
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
  return true;
}{code}
 I guess instead of just returning true, we could introduce a new enum field 
called JoinReason which could indicate the purpose of the rejoin. Thus we don't 
need to do a full rebalance when the leader is just in rolling bounce.

We could utilize this information I guess. Just add another enum field into the 
join group request called JoinReason so that we know whether leader is 
rejoining due to topic metadata change. If yes, we trigger rebalance obviously; 
if no, we shouldn't trigger rebalance.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression

2018-12-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719672#comment-16719672
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---

guozhangwang closed pull request #6024: KAFKA-7223: document suppression buffer 
metrics
URL: https://github.com/apache/kafka/pull/6024
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/ops.html b/docs/ops.html
index f48791181c2..64f71593d5d 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1441,7 +1441,7 @@ Streams Mo
 metrics.recording.level="info"
 
 Thread Metrics
-All the following metrics have a recording level of ``info``:
+All the following metrics have a recording level of info:
 
 
   
@@ -1563,7 +1563,7 @@ Task Metrics
-All the following metrics have a recording level of ``debug``:
+All the following metrics have a recording level of debug:
  
   
   
@@ -1605,7 +1605,7 @@ 
 
 
  Processor Node Metrics
- All the following metrics have a recording level of ``debug``:
+ All the following metrics have a recording level of debug:
  
   
   
@@ -1703,13 +1703,35 @@ 
 The total number of of records being forwarded downstream, from 
source nodes only. 
 
kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
   
+  
+suppression-emit-rate
+
+  The rate at which records that have been emitted downstream from 
suppression operation nodes.
+  Compare with the process-rate metric to determine how 
many updates are being suppressed.
+
+
kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
+  
+  
+suppression-emit-total
+
+  The total number of records that have been emitted downstream from 
suppression operation nodes.
+  Compare with the process-total metric to determine how 
many updates are being suppressed.
+
+
kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
+  
  
  
 
  State Store Metrics
- All the following metrics have a recording level of ``debug``. Note that the 
``store-scope`` value is specified in StoreSupplier#metricsScope() 
for user's customized
- state stores; for built-in state stores, currently we have 
in-memory-state, in-memory-lru-state, 
rocksdb-state (for RocksDB backed key-value store),
-  rocksdb-window-state (for RocksDB backed window store) and 
rocksdb-session-state (for RocksDB backed session store).
+ All the following metrics have a recording level of debug. Note 
that the store-scope value is specified in 
StoreSupplier#metricsScope() for user's customized
+ state stores; for built-in state stores, currently we have:
+  
+in-memory-state
+in-memory-lru-state
+rocksdb-state (for RocksDB backed key-value store)
+rocksdb-window-state (for RocksDB backed window 
store)
+rocksdb-session-state (for RocksDB backed session 
store)
+  
 
   
   
@@ -1902,7 +1924,7 @@ Record Cache Metrics
-  All the following metrics have a recording level of ``debug``:
+  All the following metrics have a recording level of debug:
 
   
   
@@ -1929,6 +1951,44 @@ Suppression Buffer Metrics
+  All the following metrics have a recording level of debug:
+
+  
+  
+  
+suppression-buffer-size-current
+The current total size, in bytes, of the buffered data.
+
kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)
+  
+  
+suppression-buffer-size-avg
+The average total size, in bytes, of the buffered data over the 
sampling window.
+
kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)
+  
+  
+suppression-buffer-size-max
+The maximum total size, in bytes, of the buffered data over the 
sampling window.
+
kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)
+  
+  
+suppression-buffer-count-current
+The current number of records buffered.
+
kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)
+  
+  
+suppression-buffer-size-avg
+The average number of records buffered over the sampling 
window.
+
kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)
+  
+  
+suppression-buffer-size-max
+The maximum number of records buffered over the sampling 
window.
+   

[jira] [Assigned] (KAFKA-7715) Connect should have a parameter to disable WADL output for OPTIONS method

2018-12-12 Thread Oleksandr Diachenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Diachenko reassigned KAFKA-7715:
--

Assignee: Oleksandr Diachenko

> Connect should have a parameter to disable WADL output for OPTIONS method
> -
>
> Key: KAFKA-7715
> URL: https://issues.apache.org/jira/browse/KAFKA-7715
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, security
>Affects Versions: 2.1.0
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
> Fix For: 2.1.1
>
>
> Currently, Connect REST API exposes WADL output on OPTIONS method:
> {code:bash}
> curl -i -X OPTIONS http://localhost:8083/connectors
> HTTP/1.1 200 OK
> Date: Fri, 07 Dec 2018 22:51:53 GMT
> Content-Type: application/vnd.sun.wadl+xml
> Allow: HEAD,POST,GET,OPTIONS
> Last-Modified: Fri, 07 Dec 2018 14:51:53 PST
> Content-Length: 1331
> Server: Jetty(9.4.12.v20180830)
> 
> http://wadl.dev.java.net/2009/02;>
> http://jersey.java.net/; jersey:generatedBy="Jersey: 2.27 
> 2018-04-10 07:34:57"/>
> 
> http://localhost:8083/application.wadl/xsd0.xsd;>
> 
> 
> 
> http://localhost:8083/;>
> 
> 
> 
> http://www.w3.org/2001/XMLSchema; name="forward" 
> style="query" type="xs:boolean"/>
> 
> 
> 
> 
> 
> 
> 
> 
> http://www.w3.org/2001/XMLSchema; name="forward" 
> style="query" type="xs:boolean"/>
> 
> 
> 
> 
> 
> 
> 
> 
> {code}
> This can be a potential vulnerability, so it makes sense to have a 
> configuration parameter, which disables WADL output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6833) KafkaProducer throws "Invalid partition given with record" exception

2018-12-12 Thread Magesh kumar Nandakumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719576#comment-16719576
 ] 

Magesh kumar Nandakumar commented on KAFKA-6833:


In my opinion, the behavior for a new topic and new partition should be no 
different. With that context and based on [~ChrisEgerton] 's suggestion, I 
really wonder if there is any specific reason for it to be handled differently 
for a new topic vs a new partition.

> KafkaProducer throws "Invalid partition given with record" exception
> 
>
> Key: KAFKA-6833
> URL: https://issues.apache.org/jira/browse/KAFKA-6833
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Arjun Satish
>Assignee: Bob Barrett
>Priority: Minor
>
> Currently, when creating topics via ZooKeeper, there is a small but definite 
> delay between creating the nodes in ZK, and having the topics created in the 
> brokers. the KafkaProducer maintains a metadata cache about topics which get 
> updated after the broker metadata is updated. If an application adds 
> partitions to a topic, and immediately tries to produce records to a new 
> partition, a KafkaException is throw with a message similar to the following:
> {code:java}
> Caused by: org.apache.kafka.common.KafkaException: Invalid partition given 
> with record: 12 is not in the range [0...1).
> {code}
> In this case, since the application has context that it created the topics, 
> it might be worthwhile to consider if a more specific exception can be thrown 
> instead of KafkaException. For example:
> {code:java}
> public class PartitionNotFoundException extends KafkaException {...}{code}
> This could allow the application to be able to interpret such an error, and 
> act accordingly.
> EDIT: Correct "create topics" to "adds partitions to a topic".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2018-12-12 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-5386.
--
   Resolution: Fixed
Fix Version/s: 1.0.0

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.0.0
>
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb

2018-12-12 Thread hitesh gollahalli bachanna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719527#comment-16719527
 ] 

hitesh gollahalli bachanna commented on KAFKA-7640:
---

[~vvcephei] sure, I have attached some sceenshot that shows how the data and 
request are on different host. I will work on gathering actual debug logs from 
kafka internal class. 

There are 3 files.

1. intital_data_host.jpeg – shows that which consumer received data. The last 
column(host) shows that. But iI used the ip address to make the actual rest 
call, which are routable.

2. rest_api_call_host.jpeg – after I sent the message, I make a rest call, they 
land on some random node, host name are on the last column.

3. rest_redirect_host.jpeg – code check of the host is localSelf, if not send 
the request to ip/host to fetch the result, which is a different host compared 
to what is shown in the intital_data_host.jpeg file.

 

here is rest call code for reference 

 
{code:java}
HostStoreInfo hostStoreInfo = 
metadataService.streamsMetadataForStoreAndKey(
config.getMOVE_DATA_AGGS_STORE_NAME(),
storeDpci,
new JsonSerializer<>());
System.out.println("host_info :" + hostStoreInfo + " for_key :" + 
storeDpci);

MoveDataVo moveDataVo = null;
System.out.println("localSelf :" + localSelf.host() + " remote_host:" + 
hostStoreInfo.getHost());

if (localSelf.host().equals(hostStoreInfo.getHost())) {
ReadOnlyKeyValueStore store = 
kafkaStreams.store(config.getMOVE_DATA_AGGS_STORE_NAME(), 
QueryableStoreTypes.keyValueStore());
if (store != null) {
moveDataVo = store.get(storeDpci);
}
} else {
String fooResourceUrl
= "http://; + hostStoreInfo.getHost() + ":" + 8080 + 
"/dataaggs/v1/get_state?storedpci=" + storeDpci + "";
ResponseEntity response
= restTemplate.getForEntity(fooResourceUrl + "", 
MoveDataVo.class);
moveDataVo = response.getBody();
System.out.println("response_from_other: " + moveDataVo);
}
{code}
 

 

 

> Kafka stream interactive query not returning data when state is backed by 
> rocksdb
> -
>
> Key: KAFKA-7640
> URL: https://issues.apache.org/jira/browse/KAFKA-7640
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: hitesh gollahalli bachanna
>Priority: Major
> Attachments: intital_data_host.jpeg, rest_api_call_host.jpeg, 
> rest_redirect_host.jpeg
>
>
> I have a kafka stream app running with 36 different instance (one for each 
> partition). Each instance come up one after the other. And I am building rest 
> service on top of the state to access the data.
> Here some code that I use:
> {code:java}
> StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); 
> --> call this find ouy which host has the key
> if (localSelf.host().equals(hostStoreInfo.getHost())) {
> get the key from local store
> }
> else {
> call the remote host using restTemplate
> }{code}
> The problem now is `metadata` object returned has a different host/ip but the 
> data is on a different node. I was able to see using some application logs I 
> printed. This happens every time I start my application.
> The `allMetadata` method in `KafkaStreams` class says the value will be 
> update as when the partition get reassigned. But its not happening in this 
> case. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb

2018-12-12 Thread hitesh gollahalli bachanna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hitesh gollahalli bachanna updated KAFKA-7640:
--
Attachment: intital_data_host.jpeg
rest_api_call_host.jpeg
rest_redirect_host.jpeg

> Kafka stream interactive query not returning data when state is backed by 
> rocksdb
> -
>
> Key: KAFKA-7640
> URL: https://issues.apache.org/jira/browse/KAFKA-7640
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: hitesh gollahalli bachanna
>Priority: Major
> Attachments: intital_data_host.jpeg, rest_api_call_host.jpeg, 
> rest_redirect_host.jpeg
>
>
> I have a kafka stream app running with 36 different instance (one for each 
> partition). Each instance come up one after the other. And I am building rest 
> service on top of the state to access the data.
> Here some code that I use:
> {code:java}
> StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); 
> --> call this find ouy which host has the key
> if (localSelf.host().equals(hostStoreInfo.getHost())) {
> get the key from local store
> }
> else {
> call the remote host using restTemplate
> }{code}
> The problem now is `metadata` object returned has a different host/ip but the 
> data is on a different node. I was able to see using some application logs I 
> printed. This happens every time I start my application.
> The `allMetadata` method in `KafkaStreams` class says the value will be 
> update as when the partition get reassigned. But its not happening in this 
> case. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb

2018-12-12 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719404#comment-16719404
 ] 

John Roesler commented on KAFKA-7640:
-

Hi [~hiteshbachanna],

You mentioned that you've confirmed from the logs that the requests are going 
to the wrong host. Can you share this evidence?

 

I think what we need to do is try to start narrowing down the scope of the 
problem. Perhaps a good starting point is to make sure that the logs contain 
both the REST events and the Streams state transitions and metadata (DEBUG 
should be sufficient for this; you'll have to make sure you log the REST 
requests as well).

I'd start by identifying one request that incorrectly returned no data. We'd 
want to know what the metadata response was for this request (so that needs to 
be logged as well).

Then, you'd locate the corresponding REST request and response on the server 
side. Also, you'd want to identify the host that actually does have the key, 
and try to narrow down where in the system the wrong host is being reported.

 

Is Streams confused about which host owns which partitions (shouldn't be 
possible)? Or are the instances reporting their hostname incorrectly? Or 
something else...

> Kafka stream interactive query not returning data when state is backed by 
> rocksdb
> -
>
> Key: KAFKA-7640
> URL: https://issues.apache.org/jira/browse/KAFKA-7640
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: hitesh gollahalli bachanna
>Priority: Major
>
> I have a kafka stream app running with 36 different instance (one for each 
> partition). Each instance come up one after the other. And I am building rest 
> service on top of the state to access the data.
> Here some code that I use:
> {code:java}
> StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); 
> --> call this find ouy which host has the key
> if (localSelf.host().equals(hostStoreInfo.getHost())) {
> get the key from local store
> }
> else {
> call the remote host using restTemplate
> }{code}
> The problem now is `metadata` object returned has a different host/ip but the 
> data is on a different node. I was able to see using some application logs I 
> printed. This happens every time I start my application.
> The `allMetadata` method in `KafkaStreams` class says the value will be 
> update as when the partition get reassigned. But its not happening in this 
> case. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7712) Handle exceptions from immediately connected channels in Selector

2018-12-12 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7712.

Resolution: Fixed

> Handle exceptions from immediately connected channels in Selector
> -
>
> Key: KAFKA-7712
> URL: https://issues.apache.org/jira/browse/KAFKA-7712
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.2.0
>
>
> We try to handle all possible exceptions in Selector to ensure that channels 
> are always closed and their states kept consistent. For immediately connected 
> channels, we should ensure that any exception during connection results in 
> the channel being closed properly and removed from all maps. This is a very 
> unlikely scenario, but we do already handle the exception. We should clean up 
> properly in the catch block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7712) Handle exceptions from immediately connected channels in Selector

2018-12-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719303#comment-16719303
 ] 

ASF GitHub Bot commented on KAFKA-7712:
---

hachikuji closed pull request #6023: KAFKA-7712: Remove channel from Selector 
before propagating exception
URL: https://github.com/apache/kafka/pull/6023
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 843d46dc736..8c46746d847 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -250,10 +250,11 @@ public Selector(long connectionMaxIdleMS, int 
failedAuthenticationDelayMs, Metri
 public void connect(String id, InetSocketAddress address, int 
sendBufferSize, int receiveBufferSize) throws IOException {
 ensureNotRegistered(id);
 SocketChannel socketChannel = SocketChannel.open();
+SelectionKey key = null;
 try {
 configureSocketChannel(socketChannel, sendBufferSize, 
receiveBufferSize);
 boolean connected = doConnect(socketChannel, address);
-SelectionKey key = registerChannel(id, socketChannel, 
SelectionKey.OP_CONNECT);
+key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
 
 if (connected) {
 // OP_CONNECT won't trigger for immediately connected channels
@@ -262,6 +263,9 @@ public void connect(String id, InetSocketAddress address, 
int sendBufferSize, in
 key.interestOps(0);
 }
 } catch (IOException | RuntimeException e) {
+if (key != null)
+immediatelyConnectedKeys.remove(key);
+channels.remove(id);
 socketChannel.close();
 throw e;
 }
@@ -316,7 +320,7 @@ private void ensureNotRegistered(String id) {
 throw new IllegalStateException("There is already a connection for 
id " + id + " that is still being closed");
 }
 
-private SelectionKey registerChannel(String id, SocketChannel 
socketChannel, int interestedOps) throws IOException {
+protected SelectionKey registerChannel(String id, SocketChannel 
socketChannel, int interestedOps) throws IOException {
 SelectionKey key = socketChannel.register(nioSelector, interestedOps);
 KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, 
key);
 this.channels.put(id, channel);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 6cf75861122..2da1cc68198 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -51,6 +51,7 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -380,16 +381,7 @@ public void testIdleExpiryWithoutReadyKeys() throws 
IOException {
 @Test
 public void testImmediatelyConnectedCleaned() throws Exception {
 Metrics metrics = new Metrics(); // new metrics object to avoid metric 
registration conflicts
-Selector selector = new Selector(5000, metrics, time, "MetricGroup", 
channelBuilder, new LogContext()) {
-@Override
-protected boolean doConnect(SocketChannel channel, 
InetSocketAddress address) throws IOException {
-// Use a blocking connect to trigger the immediately connected 
path
-channel.configureBlocking(true);
-boolean connected = super.doConnect(channel, address);
-channel.configureBlocking(false);
-return connected;
-}
-};
+Selector selector = new ImmediatelyConnectingSelector(5000, metrics, 
time, "MetricGroup", channelBuilder, new LogContext());
 
 try {
 testImmediatelyConnectedCleaned(selector, true);
@@ -400,6 +392,26 @@ protected boolean doConnect(SocketChannel channel, 
InetSocketAddress address) th
 }
 }
 
+private static class ImmediatelyConnectingSelector extends Selector {
+public ImmediatelyConnectingSelector(long connectionMaxIdleMS,
+ Metrics metrics,
+ Time time,
+ String metricGrpPrefix,
+  

[jira] [Comment Edited] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2018-12-12 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719147#comment-16719147
 ] 

Jœl Salmerón Viver edited comment on KAFKA-2729 at 12/12/18 5:31 PM:
-

[~junrao], this issue is not yet fixed it seems.   We, as others here, are 
experiencing the same loop replication of partitions when trying to delete 
topics via the bin/kakfa-topics command using 1.1 brokers.

If fixed as you say, could someone update the exact broker version where it is 
fixed?

If am torn to upgrade to 2.2 on the brokers, as this bug report does not 
reflect what you imply by your "We fixed another issue" - that is, assuming the 
issue is fixed - comment above.


was (Author: murri71):
[~junrao], this issue is not yet fixed it seems.   We, as others here, are 
experiencing the same loop replication of partitions when trying to delete 
topics via the bin/kakfa-topics command using 1.1 brokers.

If fixed as you say, could someone update the exact broker version where it is 
fixed?

If am torn to upgrade to 2.2 on the brokers, as this bug report does not 
reflect what you imply by your "We fixed another issue" comment above.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Major
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient

2018-12-12 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719228#comment-16719228
 ] 

Jun Rao commented on KAFKA-5692:


[~tombentley], that's good to know. Thanks for your help.

> Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
> -
>
> Key: KAFKA-5692
> URL: https://issues.apache.org/jira/browse/KAFKA-5692
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: kip, patch-available
> Fix For: 2.2.0
>
>
> The PreferredReplicaLeaderElectionCommand currently uses a direct connection 
> to zookeeper. The zookeeper dependency should be deprecated and an 
> AdminClient API created to be used instead. 
> This change will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient

2018-12-12 Thread Tom Bentley (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719212#comment-16719212
 ] 

Tom Bentley commented on KAFKA-5692:


[~junrao] I should have time in the next few weeks to work on this PR again.

> Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
> -
>
> Key: KAFKA-5692
> URL: https://issues.apache.org/jira/browse/KAFKA-5692
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: kip, patch-available
> Fix For: 2.2.0
>
>
> The PreferredReplicaLeaderElectionCommand currently uses a direct connection 
> to zookeeper. The zookeeper dependency should be deprecated and an 
> AdminClient API created to be used instead. 
> This change will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7622) Add findSessions functionality to ReadOnlySessionStore

2018-12-12 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-7622:
-

Assignee: Satish Duggana

> Add findSessions functionality to ReadOnlySessionStore
> --
>
> Key: KAFKA-7622
> URL: https://issues.apache.org/jira/browse/KAFKA-7622
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Di Campo
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip, user-experience
>
> When creating a session store from the DSL, and you get a 
> {{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in 
> a {{SessionStore}}, even if the key type is a {{Windowed}}. So you would 
> have to iterate through it to find the time-related entries, which should be 
> less efficient than querying by time.
> So the purpose of this ticket is to be able to query the store with (key, 
> time).
> Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
> time-bound access) to {{ReadOnlySessionStore.}}
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7705) Update javadoc for the values of delivery.timeout.ms or linger.ms

2018-12-12 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7705.

   Resolution: Fixed
Fix Version/s: 2.1.1

> Update javadoc for the values of delivery.timeout.ms or linger.ms
> -
>
> Key: KAFKA-7705
> URL: https://issues.apache.org/jira/browse/KAFKA-7705
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, documentation, producer 
>Affects Versions: 2.1.0
>Reporter: huxihx
>Priority: Minor
>  Labels: newbie
> Fix For: 2.1.1
>
>
> In 
> [https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]
> the sample producer code fails to run due to the ConfigException thrown: 
> delivery.timeout.ms should be equal to or larger than linger.ms + 
> request.timeout.ms
> The given value for delivery.timeout.ms or linger.ms on that page should be 
> updated accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7705) Update javadoc for the values of delivery.timeout.ms or linger.ms

2018-12-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719162#comment-16719162
 ] 

ASF GitHub Bot commented on KAFKA-7705:
---

hachikuji closed pull request #6000: MINOR KAFKA-7705 : update java doc for 
delivery.timeout.ms
URL: https://github.com/apache/kafka/pull/6000
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c9052e3b51f..85ed9f87a1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -96,10 +96,6 @@
  * Properties props = new Properties();
  * props.put("bootstrap.servers", "localhost:9092");
  * props.put("acks", "all");
- * props.put("delivery.timeout.ms", 3);
- * props.put("batch.size", 16384);
- * props.put("linger.ms", 1);
- * props.put("buffer.memory", 33554432);
  * props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
  * props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
  *


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update javadoc for the values of delivery.timeout.ms or linger.ms
> -
>
> Key: KAFKA-7705
> URL: https://issues.apache.org/jira/browse/KAFKA-7705
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, documentation, producer 
>Affects Versions: 2.1.0
>Reporter: huxihx
>Priority: Minor
>  Labels: newbie
>
> In 
> [https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]
> the sample producer code fails to run due to the ConfigException thrown: 
> delivery.timeout.ms should be equal to or larger than linger.ms + 
> request.timeout.ms
> The given value for delivery.timeout.ms or linger.ms on that page should be 
> updated accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2018-12-12 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719147#comment-16719147
 ] 

Jœl Salmerón Viver commented on KAFKA-2729:
---

[~junrao], this issue is not yet fixed it seems.   We, as others here, are 
experiencing the same loop replication of partitions when trying to delete 
topics via the bin/kakfa-topics command using 1.1 brokers.

If fixed as you say, could someone update the exact broker version where it is 
fixed?

If am torn to upgrade to 2.2 on the brokers, as this bug report does not 
reflect what you imply by your "We fixed another issue" comment above.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Major
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value

2018-12-12 Thread Renato Mefi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719142#comment-16719142
 ] 

Renato Mefi commented on KAFKA-3832:


Indeed, I've attempted to fix it [https://github.com/apache/kafka/pull/6027]
It doesn't seem to be a BCC but I'm unsure as well

> Kafka Connect's JSON Converter never outputs a null value
> -
>
> Key: KAFKA-3832
> URL: https://issues.apache.org/jira/browse/KAFKA-3832
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Prasanna Subburaj
>Priority: Major
>  Labels: newbie
>
> Kafka Connect's JSON Converter will never output a null value when 
> {{enableSchemas=true}}. This means that when a connector outputs a 
> {{SourceRecord}} with a null value, the JSON Converter will always produce a 
> message value with:
> {code:javascript}
> {
>   "schema": null,
>   "payload": null
> }
> {code}
> And, this means that while Kafka log compaction will always be able to remove 
> earlier messages with the same key, log compaction will never remove _all_ of 
> the messages with the same key. 
> The JSON Connector's {{fromConnectData(...)}} should always return null when 
> it is supplied a null value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value

2018-12-12 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719137#comment-16719137
 ] 

Gunnar Morling commented on KAFKA-3832:
---

What would be the right fix to this issue then, could it simply be changed to 
return a null value in this case? That should be simple enough, but I'm 
wondering whether there'll be a problem if some records have a schema 
(non-tombstones) and some others don't (tombstones).

> Kafka Connect's JSON Converter never outputs a null value
> -
>
> Key: KAFKA-3832
> URL: https://issues.apache.org/jira/browse/KAFKA-3832
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Prasanna Subburaj
>Priority: Major
>  Labels: newbie
>
> Kafka Connect's JSON Converter will never output a null value when 
> {{enableSchemas=true}}. This means that when a connector outputs a 
> {{SourceRecord}} with a null value, the JSON Converter will always produce a 
> message value with:
> {code:javascript}
> {
>   "schema": null,
>   "payload": null
> }
> {code}
> And, this means that while Kafka log compaction will always be able to remove 
> earlier messages with the same key, log compaction will never remove _all_ of 
> the messages with the same key. 
> The JSON Connector's {{fromConnectData(...)}} should always return null when 
> it is supplied a null value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop

2018-12-12 Thread Andrea (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719006#comment-16719006
 ] 

Andrea commented on KAFKA-4740:
---

Adding more information to the SerializationException is more than welcome. 
However, a configurable SerializationExceptionHandler (interface) is a better 
solution. There could be a provided SeekAfterSerializationExceptionHandler 
implementation that log as WARN topic, partition and offset before seeking 
after.

Any plan to provide a solution? 

> Using new consumer API with a Deserializer that throws SerializationException 
> can lead to infinite loop
> ---
>
> Key: KAFKA-4740
> URL: https://issues.apache.org/jira/browse/KAFKA-4740
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
> Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on 
> the broker version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Sébastien Launay
>Assignee: Sébastien Launay
>Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws 
> a {{SerializationException}} through {{MessageAndMetadata#key()}} and 
> {{MessageAndMetadata#message()}} that can be catched by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such 
> the exception is swallowed by the {{NetworkClient}} class and result in an 
> infinite loop which the client has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset 
> for partition test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 
> for partition test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another 
> issue still remains.
> Indeed, the client can now catch the {{SerializationException}} but the next 
> call to {{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most 
> released kafka-clients versions):
> {code:java}
> try (KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(consumerConfig, new StringDeserializer(), new 
> IntegerDeserializer())) {
> kafkaConsumer.subscribe(Arrays.asList("topic"));
> // Will run till the shutdown hook is called
> while (!doStop) {
> try {
> ConsumerRecords records = 
> kafkaConsumer.poll(1000);
> if (!records.isEmpty()) {
> logger.info("Got {} messages", records.count());
> for (ConsumerRecord record : records) {
> logger.info("Message with partition: {}, offset: {}, key: 
> {}, value: {}",
> record.partition(), record.offset(), record.key(), 
> record.value());
> }
> } else {
> logger.info("No messages to consume");
> }
> } catch (SerializationException e) {
> logger.warn("Failed polling some records", e);
> }
>  }
> }
> {code}
> when run with the following records (third record has an invalid Integer 
> value):
> {noformat}
> printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO  consumer.Consumer - Got 2 messages
> INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, 
> value: 0
> INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, 
> value: 1
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed 

[jira] [Updated] (KAFKA-7727) kafka-producer-network-thread throwing Error: NOT_LEADER_FOR_PARTITION after adding new broker to kafka cluster

2018-12-12 Thread venkatakumar puvvada (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

venkatakumar puvvada updated KAFKA-7727:

Description: 
Kafka producer network thread throwing Error:NOT_LEADER_FOR_PARTITION  after 
adding a broker to the cluster in the consumer group during the load.

Below are the steps followed 

Env: 3 brokers and 3 zookeeper 

1. Sending  4K messages/sec

2. Performed HA on one node (say node1).

3. continue to run the load

4. No problem in the consumer group.

5.start node 1 after sometime (say 10mins).

after adding the Kafka broker to the cluster below error throwing continuously.

 

2018-12-11 12:30:43.505 
sp-46-e695c9a3-764f-44ec-a018-885005a21034-StreamThread-4 ERROR - Failed to 
process key x by worker:sp-work with exception

org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending 
since an error caught with a previous record (key x value [B@4c4a09b7 timestamp 
1544531442264) to topic x-changelog due to 
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition..
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.

  was:
Kafka producer network thread throwing Error:NOT_LEADER_FOR_PARTITION  after 
adding a broker to the cluster in the consumer group during the load.

Below are the steps followed 

Env: 3 brokers and 3 zookeeper 

1. Sending  4K messages/sec

2. Performed HA on one node (say node1).

3. continue to run the load

4. No problem in the consumer group.

5.start node 1 after sometime (say 10mins).

after adding the Kafka broker to the cluster below error throwing continuously.

 

2018-12-11 12:30:43.505 
sp-46-e695c9a3-764f-44ec-a018-885005a21034-StreamThread-4 ERROR - Failed to 
process key x by worker:y with exception

org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending 
since an error caught with a previous record (key x value [B@4c4a09b7 timestamp 
1544531442264) to topic x-changelog due to 
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition..
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.


> kafka-producer-network-thread  throwing Error: NOT_LEADER_FOR_PARTITION after 
> 

[jira] [Updated] (KAFKA-7727) kafka-producer-network-thread throwing Error: NOT_LEADER_FOR_PARTITION after adding new broker to kafka cluster

2018-12-12 Thread venkatakumar puvvada (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

venkatakumar puvvada updated KAFKA-7727:

Description: 
Kafka producer network thread throwing Error:NOT_LEADER_FOR_PARTITION  after 
adding a broker to the cluster in the consumer group during the load.

Below are the steps followed 

Env: 3 brokers and 3 zookeeper 

1. Sending  4K messages/sec

2. Performed HA on one node (say node1).

3. continue to run the load

4. No problem in the consumer group.

5.start node 1 after sometime (say 10mins).

after adding the Kafka broker to the cluster below error throwing continuously.

 

2018-12-11 12:30:43.505 
sp-46-e695c9a3-764f-44ec-a018-885005a21034-StreamThread-4 ERROR - Failed to 
process key x by worker:y with exception

org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending 
since an error caught with a previous record (key x value [B@4c4a09b7 timestamp 
1544531442264) to topic x-changelog due to 
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition..
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.

  was:
Kafka producer network thread throwing Error:NOT_LEADER_FOR_PARTITION  after 
adding a broker to the cluster in the consumer group during the load.

Below are the steps followed 

Env: 3 brokers and 3 zookeeper 

1. Sending  4K messages/sec

2. Performed HA on one node (say node1).

3. continue to run the load

4. No problem in the consumer group.

5.start node 1 after sometime (say 10mins).

after adding the Kafka broker to the cluster below error throwing continuously.

 

2018-12-11 12:30:43.505 
harman-perf-vehicle-status-sp-46-e695c9a3-764f-44ec-a018-885005a21034-StreamThread-4
 ERROR - Failed to process key x by worker:vehicle-status with exception

org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending 
since an error caught with a previous record (key x value [B@4c4a09b7 timestamp 
1544531442264) to topic x-changelog due to 
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition..
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
 at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
 at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
 at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
 at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
 at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
 at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
 at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.


> kafka-producer-network-thread  

[jira] [Created] (KAFKA-7727) kafka-producer-network-thread throwing Error: NOT_LEADER_FOR_PARTITION after adding new broker to kafka cluster

2018-12-12 Thread venkatakumar puvvada (JIRA)
venkatakumar puvvada created KAFKA-7727:
---

 Summary: kafka-producer-network-thread  throwing Error: 
NOT_LEADER_FOR_PARTITION after adding new broker to kafka cluster
 Key: KAFKA-7727
 URL: https://issues.apache.org/jira/browse/KAFKA-7727
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 1.0.0
 Environment: docker
Reporter: venkatakumar puvvada


Kafka producer network thread throwing Error:NOT_LEADER_FOR_PARTITION  after 
adding a broker to the cluster in the consumer group during the load.

Below are the steps followed 

Env: 3 brokers and 3 zookeeper 

1. Sending  4K messages/sec

2. Performed HA on one node (say node1).

3. continue to run the load

4. No problem in the consumer group.

5.start node 1 after sometime (say 10mins).

after adding the Kafka broker to the cluster below error throwing continuously.

 

2018-12-11 12:30:43.505 
harman-perf-vehicle-status-sp-46-e695c9a3-764f-44ec-a018-885005a21034-StreamThread-4
 ERROR - Failed to process key x by worker:vehicle-status with exception

org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending 
since an error caught with a previous record (key x value [B@4c4a09b7 timestamp 
1544531442264) to topic x-changelog due to 
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition..
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
 at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
 at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
 at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
 at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
 at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
 at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
 at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7726) Connector status on deletion

2018-12-12 Thread Eyal Ringort (JIRA)
Eyal Ringort created KAFKA-7726:
---

 Summary: Connector status on deletion
 Key: KAFKA-7726
 URL: https://issues.apache.org/jira/browse/KAFKA-7726
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0
Reporter: Eyal Ringort


When deleting a kafka-connect connector and then querying (via REST API - 
/connectors/connector-1234/status) for its status, response is not consistent - 
Sometimes we get:
{code:java}
HTTP/1.1 404 Not Found

Content-Length: 88

Content-Type: application/json

Date: Wed, 12 Dec 2018 13:00:25 GMT

Server: Jetty(9.2.24.v20180105)

{

    "error_code": 404,

    "message": "No status found for connector connector-1234"

}
{code}
And sometimes we get:
{code:java}
HTTP/1.1 200 OK

Content-Length: 177

Content-Type: application/json

Date: Wed, 12 Dec 2018 13:00:06 GMT

Server: Jetty(9.2.24.v20180105)

{

    "connector": {

        "state": "UNASSIGNED",

        "worker_id": "kafka:8083"

    },

    "name": "connector-1234",

    "tasks": [

        {

            "id": 0,

            "state": "RUNNING",

            "worker_id": "kafka:8083"

        }

    ],

    "type": "unknown"

}
{code}
and after a while (can be a few seconds and up to a minute) we get the 404 
response.

 

Plus, sometimes REST API calls to /connectors/ get stuck when the connector is 
in this "UNASSIGNED" status.

 

I'm using cp-kafka:4.1.2 & cp-kafka-connect:4.1.2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7382) We shoud guarantee at lest one replica of partition should be alive when create or update topic

2018-12-12 Thread Suman B N (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718954#comment-16718954
 ] 

Suman B N commented on KAFKA-7382:
--

The check is not if all replicas are up. Instead, it is if atleast one replica 
is up. Creating such topics with all invalid brokers will end up in messing up 
kafka cluster with all invalid topics.
Also why do we even need to allow topics to be created when those brokers are 
offline? Such topics once created cannot be deleted.

> We shoud guarantee at lest one replica of partition should be alive when 
> create or update topic
> ---
>
> Key: KAFKA-7382
> URL: https://issues.apache.org/jira/browse/KAFKA-7382
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0
>Reporter: zhaoshijie
>Priority: Major
>
> For example:I have brokers: 1,2,3,4,5. I create a new topic by command: 
> {code:java}
> sh kafka-topics.sh --create --topic replicaserror --zookeeper localhost:2181 
> --replica-assignment 11:12:13,12:13:14,14:15:11,14:12:11,13:14:11
> {code}
> Then kafkaController will process this,after partitionStateMachine and 
> replicaStateMachine handle state change,topic metadatas and state will be 
> strange,partitions is on NewPartition and replicas is on OnlineReplica. 
> Next wo can not delete this topic(bacase state change illegal ),This will 
> cause a number of problems.So i think wo shoud check replicas assignment when 
> create or update topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7694) Support ZooKeeper based master/secret key management for delegation tokens

2018-12-12 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-7694:
--
Labels: needs-kip  (was: )

>  Support ZooKeeper based master/secret key management for delegation tokens
> ---
>
> Key: KAFKA-7694
> URL: https://issues.apache.org/jira/browse/KAFKA-7694
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Manikumar
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
>
> Master/secret key is used to generate and verify delegation tokens. 
> currently, master key/secret is stored as plain text in server.properties 
> config file. Same key must be configured across all the brokers. We require a 
> re-deployment when the secret needs to be rotated.
> This JIRA is to explore and implement a ZooKeeper based master/secret key 
> management to automate secret key generation and expiration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7694) Support ZooKeeper based master/secret key management for delegation tokens

2018-12-12 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-7694:
--
Component/s: security

>  Support ZooKeeper based master/secret key management for delegation tokens
> ---
>
> Key: KAFKA-7694
> URL: https://issues.apache.org/jira/browse/KAFKA-7694
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Manikumar
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
>
> Master/secret key is used to generate and verify delegation tokens. 
> currently, master key/secret is stored as plain text in server.properties 
> config file. Same key must be configured across all the brokers. We require a 
> re-deployment when the secret needs to be rotated.
> This JIRA is to explore and implement a ZooKeeper based master/secret key 
> management to automate secret key generation and expiration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4790) Kafka cannot recover after a disk full

2018-12-12 Thread Eno Thereska (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4790:

Affects Version/s: 2.1.0

> Kafka cannot recover after a disk full
> --
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1, 2.1.0
>Reporter: Pengwei
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4790) Kafka cannot recover after a disk full

2018-12-12 Thread Eno Thereska (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reopened KAFKA-4790:
-

> Kafka cannot recover after a disk full
> --
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1, 2.1.0
>Reporter: Pengwei
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4790) Kafka cannot recover after a disk full

2018-12-12 Thread Eno Thereska (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718890#comment-16718890
 ] 

Eno Thereska commented on KAFKA-4790:
-

I can confirm this still happens with 2.1.0.

> Kafka cannot recover after a disk full
> --
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1, 2.1.0
>Reporter: Pengwei
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7694) Support ZooKeeper based master/secret key management for delegation tokens

2018-12-12 Thread Satish Duggana (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718532#comment-16718532
 ] 

Satish Duggana edited comment on KAFKA-7694 at 12/12/18 11:35 AM:
--

This Jira also includes giving pluggable interfaces for delegation token 
storage and master key management and have zookeeper storage as the default 
implementation.

I am working on a KIP for the above, I will send it out when it is ready.


was (Author: satish.duggana):
This is about giving pluggable interfaces for delegation token storage and 
master key management and have zookeeper storage as the default implementation.

I am working on a KIP for the above, I will send it out when it is ready.

>  Support ZooKeeper based master/secret key management for delegation tokens
> ---
>
> Key: KAFKA-7694
> URL: https://issues.apache.org/jira/browse/KAFKA-7694
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Manikumar
>Assignee: Satish Duggana
>Priority: Major
>
> Master/secret key is used to generate and verify delegation tokens. 
> currently, master key/secret is stored as plain text in server.properties 
> config file. Same key must be configured across all the brokers. We require a 
> re-deployment when the secret needs to be rotated.
> This JIRA is to explore and implement a ZooKeeper based master/secret key 
> management to automate secret key generation and expiration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7382) We shoud guarantee at lest one replica of partition should be alive when create or update topic

2018-12-12 Thread Eno Thereska (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718814#comment-16718814
 ] 

Eno Thereska commented on KAFKA-7382:
-

Not sure I understand. Creating a topic is a control-plane operation, not a 
data plane operation. Why do we need to enforce that all replicas are up? It's 
not clear to me that we can ever fully enforce that anyway. For example, the 
replicas might be up when we check, and them immediately fail after the check.

> We shoud guarantee at lest one replica of partition should be alive when 
> create or update topic
> ---
>
> Key: KAFKA-7382
> URL: https://issues.apache.org/jira/browse/KAFKA-7382
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0
>Reporter: zhaoshijie
>Priority: Major
>
> For example:I have brokers: 1,2,3,4,5. I create a new topic by command: 
> {code:java}
> sh kafka-topics.sh --create --topic replicaserror --zookeeper localhost:2181 
> --replica-assignment 11:12:13,12:13:14,14:15:11,14:12:11,13:14:11
> {code}
> Then kafkaController will process this,after partitionStateMachine and 
> replicaStateMachine handle state change,topic metadatas and state will be 
> strange,partitions is on NewPartition and replicas is on OnlineReplica. 
> Next wo can not delete this topic(bacase state change illegal ),This will 
> cause a number of problems.So i think wo shoud check replicas assignment when 
> create or update topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7725) Add a delay for further CG rebalances, beyond KIP-134 group.initial.rebalance.delay.ms

2018-12-12 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-7725:


 Summary: Add a delay for further CG rebalances, beyond KIP-134 
group.initial.rebalance.delay.ms
 Key: KAFKA-7725
 URL: https://issues.apache.org/jira/browse/KAFKA-7725
 Project: Kafka
  Issue Type: New Feature
  Components: clients, consumer, core
Affects Versions: 2.1.0
Reporter: Antony Stubbs


KIP-134 group.initial.rebalance.delay.ms was a good start, but there are much 
bigger problems where after a system is up and running, consumers can leave and 
join in large amounts, causing rebalance storms. One example is Mesosphere 
deploying new versions of an app - say there are 10 instances, then 10 more 
instances are deployed with the new version, then the old 10 are scaled down. 
Ideally this would be 1 or 2 rebalances, instead of 20.

The trade off is that if the delay is 5 seconds, every consumer joining within 
that window would extend it by another 5 seconds, potentially causing 
partitions to never be processed. To mitigate this, either a max rebalance 
delay could also be added, or multiple consumers joining won't extend the 
rebalance delay, so that it's always a max of 5 seconds.

Related: [KIP-345: Introduce static membership protocol to reduce consumer 
rebalances|https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances]
KAFKA-7018: persist memberId for consumer restart



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)