[jira] [Commented] (KAFKA-5092) KIP 141 - ProducerRecord Interface Improvements

2017-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290367#comment-16290367
 ] 

ASF GitHub Bot commented on KAFKA-5092:
---

Github user simplesteph closed the pull request at:

https://github.com/apache/kafka/pull/2894


> KIP 141 - ProducerRecord Interface Improvements
> ---
>
> Key: KAFKA-5092
> URL: https://issues.apache.org/jira/browse/KAFKA-5092
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stephane Maarek
>  Labels: kip
> Fix For: 1.1.0
>
>
> See KIP here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP+141+-+ProducerRecord+Interface+Improvements



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower

2017-12-13 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290268#comment-16290268
 ] 

Jason Gustafson commented on KAFKA-6361:


Unclean leader election was disabled. It may not have been a session expiration 
that caused B to become leader (I supposed this, but it's not clear in the logs 
and I haven't seen controller logs yet).  In any case, when broker B took over, 
broker A was still in the ISR. Broker B appended the entry as described above 
and then attempted to shrink the ISR, but it failed to do so because of an 
invalid cached zk version. Broker A had already become leader at that point.

> Fast leader fail over can lead to log divergence between leader and follower
> 
>
> Key: KAFKA-6361
> URL: https://issues.apache.org/jira/browse/KAFKA-6361
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> We have observed an edge case in the replication failover logic which can 
> cause a replica to permanently fall out of sync with the leader or, in the 
> worst case, actually have localized divergence between logs. This occurs in 
> spite of the improved truncation logic from KIP-101. 
> Suppose we have brokers A and B. Initially A is the leader in epoch 1. It 
> appends two batches: one in the range (0, 10) and the other in the range (11, 
> 20). The first one successfully replicates to B, but the second one does not. 
> In other words, the logs on the brokers look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> {code}
> Broker A then has a zk session expiration and broker B is elected with epoch 
> 2. It appends a new batch with offsets (11, n) to its local log. So we now 
> have this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Normally we expect broker A to truncate to offset 11 on becoming the 
> follower, but before it is able to do so, broker B has its own zk session 
> expiration and broker A again becomes leader, now with epoch 3. It then 
> appends a new entry in the range (21, 30). The updated logs look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> 2: offsets: [21, 30], leader epoch: 3
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Now what happens next depends on the last offset of the batch appended in 
> epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch 
> request to broker A with epoch 2. Broker A will respond that epoch 2 ends at 
> offset 21. There are three cases:
> 1) n < 20: In this case, broker B will not do any truncation. It will begin 
> fetching from offset n, which will ultimately cause an out of order offset 
> error because broker A will return the full batch beginning from offset 11 
> which broker B will be unable to append.
> 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 
> and everything will appear fine though the logs have actually diverged.
> 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in 
> the middle of the batch, it will truncate all the way to offset 10. It can 
> begin fetching from offset 11 and everything is fine.
> The case we have actually seen is the first one. The second one would likely 
> go unnoticed in practice and everything is fine in the third case. To 
> workaround the issue, we deleted the active segment on the replica which 
> allowed it to re-replicate consistently from the leader.
> I'm not sure the best solution for this scenario. Maybe if the leader isn't 
> aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} 
> instead of using the offset of the next highest epoch. That would cause the 
> follower to truncate using its high watermark. Or perhaps instead of doing 
> so, it could send another OffsetForLeaderEpoch request at the next previous 
> cached epoch and then truncate using that. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2017-12-13 Thread Sean Policarpio (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290260#comment-16290260
 ] 

Sean Policarpio commented on KAFKA-2260:


We also desire optimistic concurrency control for our Kafka application. 
Understandably, relying on Kafka to manage offsets should be the defacto 
strategy, but our use case requires more transactionality before allowing Kafka 
to accept data we produce. Ideally, it would be great if the API could let us 
fail fast when we attempt to push data that we assume was still sequential.

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Kirwin
>Priority: Minor
> Attachments: KAFKA-2260.patch, expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower

2017-12-13 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290252#comment-16290252
 ] 

Jun Rao commented on KAFKA-6361:


Was unclean leader election enabled in this case? When broker B takes over as 
the new leader because broker A's ZK session is expired, the controller is 
supposed to also shrink ISR to just {B}. If unclean leader election is 
disabled, when broker B's ZK session expires, broker A can't take over as the 
new leader since it's not in ISR.

In KIP-101, we didn't solve the problem with log divergence when an unclean 
leader election occurs 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Appendix(a):PossibilityforDivergentLogswithLeaderEpochs).
 Solving that problem requires more thoughts especially with compacted topics 
when certain leader epochs in the middle could have been fully garbage 
collected.

> Fast leader fail over can lead to log divergence between leader and follower
> 
>
> Key: KAFKA-6361
> URL: https://issues.apache.org/jira/browse/KAFKA-6361
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> We have observed an edge case in the replication failover logic which can 
> cause a replica to permanently fall out of sync with the leader or, in the 
> worst case, actually have localized divergence between logs. This occurs in 
> spite of the improved truncation logic from KIP-101. 
> Suppose we have brokers A and B. Initially A is the leader in epoch 1. It 
> appends two batches: one in the range (0, 10) and the other in the range (11, 
> 20). The first one successfully replicates to B, but the second one does not. 
> In other words, the logs on the brokers look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> {code}
> Broker A then has a zk session expiration and broker B is elected with epoch 
> 2. It appends a new batch with offsets (11, n) to its local log. So we now 
> have this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Normally we expect broker A to truncate to offset 11 on becoming the 
> follower, but before it is able to do so, broker B has its own zk session 
> expiration and broker A again becomes leader, now with epoch 3. It then 
> appends a new entry in the range (21, 30). The updated logs look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> 2: offsets: [21, 30], leader epoch: 3
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Now what happens next depends on the last offset of the batch appended in 
> epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch 
> request to broker A with epoch 2. Broker A will respond that epoch 2 ends at 
> offset 21. There are three cases:
> 1) n < 20: In this case, broker B will not do any truncation. It will begin 
> fetching from offset n, which will ultimately cause an out of order offset 
> error because broker A will return the full batch beginning from offset 11 
> which broker B will be unable to append.
> 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 
> and everything will appear fine though the logs have actually diverged.
> 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in 
> the middle of the batch, it will truncate all the way to offset 10. It can 
> begin fetching from offset 11 and everything is fine.
> The case we have actually seen is the first one. The second one would likely 
> go unnoticed in practice and everything is fine in the third case. To 
> workaround the issue, we deleted the active segment on the replica which 
> allowed it to re-replicate consistently from the leader.
> I'm not sure the best solution for this scenario. Maybe if the leader isn't 
> aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} 
> instead of using the offset of the next highest epoch. That would cause the 
> follower to truncate using its high watermark. Or perhaps instead of doing 
> so, it could send another OffsetForLeaderEpoch request at the next previous 
> cached epoch and then truncate using that. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2017-12-13 Thread Jed Wesley-Smith (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290245#comment-16290245
 ] 

Jed Wesley-Smith commented on KAFKA-2260:
-

This would be a valuable feature, currently we need something else to 
coordinate monotonic writes (for an event source), and Kafka cannot support 
this directly.

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Kirwin
>Priority: Minor
> Attachments: KAFKA-2260.patch, expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5849) Add process stop faults, round trip workload, partitioned produce-consume test

2017-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290205#comment-16290205
 ] 

ASF GitHub Bot commented on KAFKA-5849:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/4323

KAFKA-5849: Add process stop, round trip workload, partitioned test

* Implement process stop faults via SIGSTOP / SIGCONT

* Implement RoundTripWorkload, which both sends messages, and confirms that 
they are received at least once.

* Allow Trogdor tasks to block until other Trogdor tasks are complete.

* Add CreateTopicsWorker, which can be a building block for a lot of tests.

* Simplify how TaskSpec subclasses in ducktape serialize themselves to JSON.

* Implement some fault injection tests in round_trip_workload_test.py

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-5849

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4323.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4323


commit bd7c5ae285bcc62e6ee526de55343679e1e1
Author: Colin P. Mccabe 
Date:   2017-12-14T01:31:00Z

KAFKA-5849: Add process stop faults, round trip workload, partitioned 
produce-consume test

* Implement process stop faults via SIGSTOP / SIGCONT

* Implement RoundTripWorkload, which both sends messages, and confirms that 
they are received at least once.

* Allow Trogdor tasks to block until other Trogdor tasks are complete.

* Add CreateTopicsWorker, which can be a building block for a lot of tests.

* Simplify how TaskSpec subclasses in ducktape serialize themselves to JSON.

* Implement some fault injection tests in round_trip_workload_test.py




> Add process stop faults, round trip workload, partitioned produce-consume test
> --
>
> Key: KAFKA-5849
> URL: https://issues.apache.org/jira/browse/KAFKA-5849
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> Add partitioned produce consume test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5849) Add process stop faults, round trip workload, partitioned produce-consume test

2017-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290197#comment-16290197
 ] 

ASF GitHub Bot commented on KAFKA-5849:
---

Github user cmccabe closed the pull request at:

https://github.com/apache/kafka/pull/4106


> Add process stop faults, round trip workload, partitioned produce-consume test
> --
>
> Key: KAFKA-5849
> URL: https://issues.apache.org/jira/browse/KAFKA-5849
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> Add partitioned produce consume test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5849) Add process stop faults, round trip workload, partitioned produce-consume test

2017-12-13 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-5849:
---
Summary: Add process stop faults, round trip workload, partitioned 
produce-consume test  (was: Add partitioned produce consume test)

> Add process stop faults, round trip workload, partitioned produce-consume test
> --
>
> Key: KAFKA-5849
> URL: https://issues.apache.org/jira/browse/KAFKA-5849
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> Add partitioned produce consume test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6361) Fast leader fail over can lead to log divergence between replica and follower

2017-12-13 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6361:
--

 Summary: Fast leader fail over can lead to log divergence between 
replica and follower
 Key: KAFKA-6361
 URL: https://issues.apache.org/jira/browse/KAFKA-6361
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We have observed an edge case in the replication failover logic which can cause 
a replica to permanently fall out of sync with the leader or, in the worst 
case, actually have localized divergence between logs. This occurs in spite of 
the improved truncation logic from KIP-101. 

Suppose we have brokers A and B. Initially A is the leader in epoch 1. It 
appends two batches: one in the range (0, 10) and the other in the range (11, 
20). The first one successfully replicates to B, but the second one does not. 
In other words, the logs on the brokers look like this:

{code}
Broker A:
0: offsets [0, 10], leader epoch: 1
1: offsets [11, 20], leader epoch: 1

Broker B:
0: offsets [0, 10], leader epoch: 1
{code}

Broker A then has a zk session expiration and broker B is elected with epoch 2. 
It appends a new batch with offsets (11, n) to its local log. So we now have 
this:

{code}
Broker A:
0: offsets [0, 10], leader epoch: 1
1: offsets [11, 20], leader epoch: 1

Broker B:
0: offsets [0, 10], leader epoch: 1
1: offsets: [11, n], leader epoch: 2
{code}

Normally we expect broker A to truncate to offset 11 on becoming the follower, 
but before it is able to do so, broker B has its own zk session expiration and 
broker A again becomes leader, now with epoch 3. It then appends a new entry in 
the range (21, 30). The updated logs look like this:

{code}
Broker A:
0: offsets [0, 10], leader epoch: 1
1: offsets [11, 20], leader epoch: 1
2: offsets: [21, 30], leader epoch: 3

Broker B:
0: offsets [0, 10], leader epoch: 1
1: offsets: [11, n], leader epoch: 2
{code}

Now what happens next depends on the last offset of the batch appended in epoch 
2. On becoming follower, broker B will send an OffsetForLeaderEpoch request to 
broker A with epoch 2. Broker A will respond that epoch 2 ends at offset 21. 
There are three cases:

1) n < 20: In this case, broker B will not do any truncation. It will begin 
fetching from offset n, which will ultimately cause an out of order offset 
error because broker A will return the full batch beginning from offset 11 
which broker B will be unable to append.

2) n == 20: Again broker B does not truncate. It will fetch from offset 21 and 
everything will appear fine though the logs have actually diverged.

3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in the 
middle of the batch, it will truncate all the way to offset 10. It can begin 
fetching from offset 11 and everything is fine.

The case we have actually seen is the first one. The second one would likely go 
unnoticed in practice and everything is fine in the third case. To workaround 
the issue, we deleted the active segment on the replica which allowed it to 
re-replicate consistently from the leader.

I'm not sure the best solution for this scenario. Maybe if the leader isn't 
aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} 
instead of using the offset of the next highest epoch. That would cause the 
follower to truncate using its high watermark. Or perhaps instead of doing so, 
it could send another OffsetForLeaderEpoch request at the next previous cached 
epoch and then truncate using that. 




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2017-12-13 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290140#comment-16290140
 ] 

huxihx commented on KAFKA-6345:
---

A easy-thinking solution is to create a safe count method only for JMX metrics. 
The safe version creates a live snapshot for the map by deep copying each map 
entries.

> NetworkClient.inFlightRequestCount() is not thread safe, causing 
> ConcurrentModificationExceptions when sensors are read
> ---
>
> Key: KAFKA-6345
> URL: https://issues.apache.org/jira/browse/KAFKA-6345
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: radai rosenblatt
>
> example stack trace (code is ~0.10.2.*)
> {code}
> java.util.ConcurrentModificationException: 
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
> {code}
> looking at latest trunk, the code is still vulnerable:
> # NetworkClient.inFlightRequestCount() eventually iterates over 
> InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
> HashMap
> # this will be called from the "requests-in-flight" sensor's measure() method 
> (Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
> thread reading JMX values
> # HashMap in question would also be updated by some client io thread calling 
> NetworkClient.doSend() - which calls into InFlightRequests.add())
> i guess the only upside is that this exception will always happen on the 
> thread reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2017-12-13 Thread William Austin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290110#comment-16290110
 ] 

William Austin commented on KAFKA-6252:
---

[~rhauch] are there any workarounds for this issue until it is changed?

> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Priority: Critical
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6339) Integration test with embedded kafka not working

2017-12-13 Thread Narendra Kumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290024#comment-16290024
 ] 

Narendra Kumar edited comment on KAFKA-6339 at 12/13/17 10:13 PM:
--

Hi Dhruv, Can you please enable debug logging and provide some more amount of 
logs? May be logs since start of test.


was (Author: narendra kumar):
Hi Dhruv, Can you please enable debug logging and provide some more amount of 
logs?

> Integration test with embedded kafka not working
> 
>
> Key: KAFKA-6339
> URL: https://issues.apache.org/jira/browse/KAFKA-6339
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.2
>Reporter: DHRUV BANSAL
>
> I am using Kafka version - 0.11.0.2
> Trying to write an integration test for one of the components I am writing 
> over Kafka.
> Following code works fine with Kafka version 0.10.0.0 but not working with 
> mentioned version (0.11.0.2)
> // setup Zookeeper
> {code:java}
> String ZKHOST = "127.0.0.1";
> String BROKERHOST = "127.0.0.1";
> String BROKERPORT = "9093";
> String TOPIC = "test1";
>   EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper();
>   String zkConnect = ZKHOST + ":" + embeddedZookeeper.port();
>   ZkClient zkClient = new ZkClient(zkConnect, 3, 3, 
> ZKStringSerializer$.MODULE$);
>   ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
>   // setup Broker
>   Properties brokerProps = new Properties();
>   brokerProps.setProperty("zookeeper.connect", zkConnect);
>   brokerProps.setProperty("broker.id", "0");
>   brokerProps.setProperty("offsets.topic.replication.factor", 
> "1");
>   String kafka_log_path = 
> Files.createTempDirectory("kafka-").toAbsolutePath().toString();
>   System.out.println("kafka log path " + kafka_log_path);
>   brokerProps.setProperty("log.dirs", kafka_log_path);
>   brokerProps.setProperty("listeners", "PLAINTEXT://" + 
> BROKERHOST + ":" + BROKERPORT);
>   KafkaConfig config = new KafkaConfig(brokerProps);
>   Time mock = new MockTime();
>   KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>   // create topic
>   AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
> RackAwareMode.Disabled$.MODULE$);
>   // setup producer
>   Properties producerProps = new Properties();
>   producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" 
> + BROKERPORT);
>   producerProps.setProperty("key.serializer", 
> "org.apache.kafka.common.serialization.IntegerSerializer");
>   producerProps.setProperty("value.serializer", 
> "org.apache.kafka.common.serialization.ByteArraySerializer");
>   KafkaProducer producer = new 
> KafkaProducer(producerProps);
>   // setup consumer
>   Properties consumerProps = new Properties();
>   consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" 
> + BROKERPORT);
>   consumerProps.setProperty("group.id", "group0");
>   consumerProps.setProperty("client.id", "consumer0");
>   consumerProps.setProperty("key.deserializer", 
> "org.apache.kafka.common.serialization.IntegerDeserializer");
>   consumerProps.setProperty("value.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>   consumerProps.put("auto.offset.reset", "earliest"); // to make 
> sure the consumer starts from the beginning of
>   
> // the topic
>   KafkaConsumer consumer = new 
> KafkaConsumer<>(consumerProps);
>   consumer.subscribe(Arrays.asList(TOPIC));
>   // send message
>   ProducerRecord data = new 
> ProducerRecord<>(TOPIC, 42,
>   
> "test-message".getBytes(StandardCharsets.UTF_8));
>   Future record = producer.send(data);
>   RecordMetadata metadata = record.get();
>   // starting consumer
>   ConsumerRecords records = consumer.poll(3000);
>   assertEquals(1, records.count());
>   Iterator> recordIterator = 
> records.iterator();
>   ConsumerRecord consumedRecord = 
> recordIterator.next();
>   System.out.printf("offset = %d, key = %s, value = %s", 
> consumedRecord.offset(), 

[jira] [Commented] (KAFKA-6339) Integration test with embedded kafka not working

2017-12-13 Thread Narendra Kumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290024#comment-16290024
 ] 

Narendra Kumar commented on KAFKA-6339:
---

Hi Dhruv, Can you please enable debug logging and provide some more amount of 
logs?

> Integration test with embedded kafka not working
> 
>
> Key: KAFKA-6339
> URL: https://issues.apache.org/jira/browse/KAFKA-6339
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.2
>Reporter: DHRUV BANSAL
>
> I am using Kafka version - 0.11.0.2
> Trying to write an integration test for one of the components I am writing 
> over Kafka.
> Following code works fine with Kafka version 0.10.0.0 but not working with 
> mentioned version (0.11.0.2)
> // setup Zookeeper
> {code:java}
> String ZKHOST = "127.0.0.1";
> String BROKERHOST = "127.0.0.1";
> String BROKERPORT = "9093";
> String TOPIC = "test1";
>   EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper();
>   String zkConnect = ZKHOST + ":" + embeddedZookeeper.port();
>   ZkClient zkClient = new ZkClient(zkConnect, 3, 3, 
> ZKStringSerializer$.MODULE$);
>   ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
>   // setup Broker
>   Properties brokerProps = new Properties();
>   brokerProps.setProperty("zookeeper.connect", zkConnect);
>   brokerProps.setProperty("broker.id", "0");
>   brokerProps.setProperty("offsets.topic.replication.factor", 
> "1");
>   String kafka_log_path = 
> Files.createTempDirectory("kafka-").toAbsolutePath().toString();
>   System.out.println("kafka log path " + kafka_log_path);
>   brokerProps.setProperty("log.dirs", kafka_log_path);
>   brokerProps.setProperty("listeners", "PLAINTEXT://" + 
> BROKERHOST + ":" + BROKERPORT);
>   KafkaConfig config = new KafkaConfig(brokerProps);
>   Time mock = new MockTime();
>   KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>   // create topic
>   AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
> RackAwareMode.Disabled$.MODULE$);
>   // setup producer
>   Properties producerProps = new Properties();
>   producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" 
> + BROKERPORT);
>   producerProps.setProperty("key.serializer", 
> "org.apache.kafka.common.serialization.IntegerSerializer");
>   producerProps.setProperty("value.serializer", 
> "org.apache.kafka.common.serialization.ByteArraySerializer");
>   KafkaProducer producer = new 
> KafkaProducer(producerProps);
>   // setup consumer
>   Properties consumerProps = new Properties();
>   consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" 
> + BROKERPORT);
>   consumerProps.setProperty("group.id", "group0");
>   consumerProps.setProperty("client.id", "consumer0");
>   consumerProps.setProperty("key.deserializer", 
> "org.apache.kafka.common.serialization.IntegerDeserializer");
>   consumerProps.setProperty("value.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>   consumerProps.put("auto.offset.reset", "earliest"); // to make 
> sure the consumer starts from the beginning of
>   
> // the topic
>   KafkaConsumer consumer = new 
> KafkaConsumer<>(consumerProps);
>   consumer.subscribe(Arrays.asList(TOPIC));
>   // send message
>   ProducerRecord data = new 
> ProducerRecord<>(TOPIC, 42,
>   
> "test-message".getBytes(StandardCharsets.UTF_8));
>   Future record = producer.send(data);
>   RecordMetadata metadata = record.get();
>   // starting consumer
>   ConsumerRecords records = consumer.poll(3000);
>   assertEquals(1, records.count());
>   Iterator> recordIterator = 
> records.iterator();
>   ConsumerRecord consumedRecord = 
> recordIterator.next();
>   System.out.printf("offset = %d, key = %s, value = %s", 
> consumedRecord.offset(), consumedRecord.key(),
>   consumedRecord.value());
>   assertEquals(42, (int) consumedRecord.key());
>   assertEquals("test-message", new 

[jira] [Assigned] (KAFKA-6352) Delay message down-conversion until response is ready to be sent

2017-12-13 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-6352:
--

Assignee: Jason Gustafson

> Delay message down-conversion until response is ready to be sent
> 
>
> Key: KAFKA-6352
> URL: https://issues.apache.org/jira/browse/KAFKA-6352
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> We have observed some users beginning to use the new message format before 
> their clients have been upgraded. As we know, this can cause a lot of memory 
> pressure due to the fact that we down-convert the full response in memory. 
> Currently we do this down-conversion prior to enqueuing the response to be 
> sent by the Processor, which may cause more heap utilization than needed with 
> a steady queue size. 
> A possible improvement is to delay the down-conversion until the response is 
> actually ready to be sent. Even better would be to do the down-conversion in 
> a streaming fashion, converting only as much as is needed at any time. One 
> potential drawback is that this moves the down-conversion into the network 
> threads.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6126) Reduce rebalance time by not checking if created topics are available

2017-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289929#comment-16289929
 ] 

ASF GitHub Bot commented on KAFKA-6126:
---

GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/4322

KAFKA-6126: Remove unnecessary topics created check


### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka 
kafka-6126-remove-topic-check-on-rebalance-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4322.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4322


commit c0a8b4868cf979e47a25d5e837b26e62e2aac212
Author: Matthias J. Sax 
Date:   2017-12-13T21:06:18Z

KAFKA-6126: Remove unnecessary topics created check




> Reduce rebalance time by not checking if created topics are available
> -
>
> Key: KAFKA-6126
> URL: https://issues.apache.org/jira/browse/KAFKA-6126
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 1.1.0
>
>
> Within {{StreamPartitionAssignor#assign}} we create new topics and afterwards 
> wait in an "infinite loop" until topic metadata propagated throughout the 
> cluster. We do this, to make sure topics are available when we start 
> processing.
> However, with this approach we "extend" the time in the rebalance phase and 
> thus are not responsive (no calls to `poll` for liveness check and 
> {{KafkaStreams#close}} suffers). Thus, we might want to remove this check and 
> handle potential "topic not found" exceptions in the main thread gracefully.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6018) Make KafkaFuture.Function java 8 lambda compatible

2017-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289883#comment-16289883
 ] 

ASF GitHub Bot commented on KAFKA-6018:
---

Github user xvrl closed the pull request at:

https://github.com/apache/kafka/pull/4308


> Make KafkaFuture.Function java 8 lambda compatible
> --
>
> Key: KAFKA-6018
> URL: https://issues.apache.org/jira/browse/KAFKA-6018
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Steven Aerts
>
> KafkaFuture.Function is currently an empty public abstract class.
> This means you cannot implement them as a java lambda.  And you end up with 
> constructs as:
> {code:java}
> new KafkaFuture.Function() {
> @Override
> public Object apply(Set strings) {
> return foo;
> }
> }
> {code}
> I propose to define them as interfaces.
> So this code can become in java 8:
> {code:java}
> strings -> foo
> {code}
> I know this change is backwards incompatible (extends becomes implements).
> But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
> And KafkaFuture states in its javadoc:
> {quote}This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.{quote}
> I think this change might be worth considering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently

2017-12-13 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289773#comment-16289773
 ] 

Manikumar commented on KAFKA-6335:
--

SimpleAclAuthorizer#updateResourceAcls() will retry incase of any update 
failure/version mismatch.  test case configures Int.Max number of retries to 
avoid any transient errors. test failures are always missing first acl in the 
expected list.  looks like a synchronization issue/corner case. Will look into 
this later this week.

> SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails 
> intermittently
> --
>
> Key: KAFKA-6335
> URL: https://issues.apache.org/jira/browse/KAFKA-6335
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
> Fix For: 1.1.0
>
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/
>  :
> {code}
> java.lang.AssertionError: expected acls Set(User:36 has Allow permission for 
> operations: Read from hosts: *, User:7 has Allow permission for operations: 
> Read from hosts: *, User:21 has Allow permission for operations: Read from 
> hosts: *, User:39 has Allow permission for operations: Read from hosts: *, 
> User:43 has Allow permission for operations: Read from hosts: *, User:3 has 
> Allow permission for operations: Read from hosts: *, User:35 has Allow 
> permission for operations: Read from hosts: *, User:15 has Allow permission 
> for operations: Read from hosts: *, User:16 has Allow permission for 
> operations: Read from hosts: *, User:22 has Allow permission for operations: 
> Read from hosts: *, User:26 has Allow permission for operations: Read from 
> hosts: *, User:11 has Allow permission for operations: Read from hosts: *, 
> User:38 has Allow permission for operations: Read from hosts: *, User:8 has 
> Allow permission for operations: Read from hosts: *, User:28 has Allow 
> permission for operations: Read from hosts: *, User:32 has Allow permission 
> for operations: Read from hosts: *, User:25 has Allow permission for 
> operations: Read from hosts: *, User:41 has Allow permission for operations: 
> Read from hosts: *, User:44 has Allow permission for operations: Read from 
> hosts: *, User:48 has Allow permission for operations: Read from hosts: *, 
> User:2 has Allow permission for operations: Read from hosts: *, User:9 has 
> Allow permission for operations: Read from hosts: *, User:14 has Allow 
> permission for operations: Read from hosts: *, User:46 has Allow permission 
> for operations: Read from hosts: *, User:13 has Allow permission for 
> operations: Read from hosts: *, User:5 has Allow permission for operations: 
> Read from hosts: *, User:29 has Allow permission for operations: Read from 
> hosts: *, User:45 has Allow permission for operations: Read from hosts: *, 
> User:6 has Allow permission for operations: Read from hosts: *, User:37 has 
> Allow permission for operations: Read from hosts: *, User:23 has Allow 
> permission for operations: Read from hosts: *, User:19 has Allow permission 
> for operations: Read from hosts: *, User:24 has Allow permission for 
> operations: Read from hosts: *, User:17 has Allow permission for operations: 
> Read from hosts: *, User:34 has Allow permission for operations: Read from 
> hosts: *, User:12 has Allow permission for operations: Read from hosts: *, 
> User:42 has Allow permission for operations: Read from hosts: *, User:4 has 
> Allow permission for operations: Read from hosts: *, User:47 has Allow 
> permission for operations: Read from hosts: *, User:18 has Allow permission 
> for operations: Read from hosts: *, User:31 has Allow permission for 
> operations: Read from hosts: *, User:49 has Allow permission for operations: 
> Read from hosts: *, User:33 has Allow permission for operations: Read from 
> hosts: *, User:1 has Allow permission for operations: Read from hosts: *, 
> User:27 has Allow permission for operations: Read from hosts: *) but got 
> Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 
> has Allow permission for operations: Read from hosts: *, User:21 has Allow 
> permission for operations: Read from hosts: *, User:39 has Allow permission 
> for operations: Read from hosts: *, User:43 has Allow permission for 
> operations: Read from hosts: *, User:3 has Allow permission for operations: 
> Read from hosts: *, User:35 has Allow permission for operations: Read from 
> hosts: *, User:15 has Allow permission for operations: Read from hosts: *, 
> User:16 has Allow permission for operations: Read from hosts: *, User:22 has 
> Allow permission for operations: Read from hosts: *, User:26 has Allow 
> 

[jira] [Updated] (KAFKA-6232) SaslSslAdminClientIntegrationTest sometimes fails

2017-12-13 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6232:
--
Labels: security  (was: )

> SaslSslAdminClientIntegrationTest sometimes fails
> -
>
> Key: KAFKA-6232
> URL: https://issues.apache.org/jira/browse/KAFKA-6232
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>  Labels: security
> Attachments: saslSslAdminClientIntegrationTest-203.out
>
>
> Here was one recent occurrence:
> https://builds.apache.org/job/kafka-trunk-jdk9/203/testReport/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.LeaderNotAvailableException: There is no 
> leader for this topic-partition as we are in the middle of a leadership 
> election.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225)
>   at 
> kafka.api.AdminClientIntegrationTest.$anonfun$testLogStartOffsetCheckpoint$3(AdminClientIntegrationTest.scala:762)
> {code}
> In the test output, I saw:
> {code}
> [2017-11-17 23:15:45,593] ERROR [KafkaApi-1] Error when handling request 
> {filters=[{resource_type=2,resource_name=foobar,principal=User:ANONYMOUS,host=*,operation=3,permission_type=3},{resource_type=5,resource_name=transactional_id,principal=User:ANONYMOUS,host=*,operation=4,permission_type=3}]}
>  (kafka.server.KafkaApis:107)
> org.apache.kafka.common.errors.ClusterAuthorizationException: Request 
> Request(processor=2, connectionId=127.0.0.1:36295-127.0.0.1:58183-0, 
> session=Session(User:client2,localhost/127.0.0.1), 
> listenerName=ListenerName(SASL_SSL), securityProtocol=SASL_SSL, buffer=null) 
> is not authorized.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently

2017-12-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289701#comment-16289701
 ] 

Guozhang Wang commented on KAFKA-6335:
--

Encountered this test failure again on Jenkins: 
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/9984/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/

> SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails 
> intermittently
> --
>
> Key: KAFKA-6335
> URL: https://issues.apache.org/jira/browse/KAFKA-6335
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
> Fix For: 1.1.0
>
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/
>  :
> {code}
> java.lang.AssertionError: expected acls Set(User:36 has Allow permission for 
> operations: Read from hosts: *, User:7 has Allow permission for operations: 
> Read from hosts: *, User:21 has Allow permission for operations: Read from 
> hosts: *, User:39 has Allow permission for operations: Read from hosts: *, 
> User:43 has Allow permission for operations: Read from hosts: *, User:3 has 
> Allow permission for operations: Read from hosts: *, User:35 has Allow 
> permission for operations: Read from hosts: *, User:15 has Allow permission 
> for operations: Read from hosts: *, User:16 has Allow permission for 
> operations: Read from hosts: *, User:22 has Allow permission for operations: 
> Read from hosts: *, User:26 has Allow permission for operations: Read from 
> hosts: *, User:11 has Allow permission for operations: Read from hosts: *, 
> User:38 has Allow permission for operations: Read from hosts: *, User:8 has 
> Allow permission for operations: Read from hosts: *, User:28 has Allow 
> permission for operations: Read from hosts: *, User:32 has Allow permission 
> for operations: Read from hosts: *, User:25 has Allow permission for 
> operations: Read from hosts: *, User:41 has Allow permission for operations: 
> Read from hosts: *, User:44 has Allow permission for operations: Read from 
> hosts: *, User:48 has Allow permission for operations: Read from hosts: *, 
> User:2 has Allow permission for operations: Read from hosts: *, User:9 has 
> Allow permission for operations: Read from hosts: *, User:14 has Allow 
> permission for operations: Read from hosts: *, User:46 has Allow permission 
> for operations: Read from hosts: *, User:13 has Allow permission for 
> operations: Read from hosts: *, User:5 has Allow permission for operations: 
> Read from hosts: *, User:29 has Allow permission for operations: Read from 
> hosts: *, User:45 has Allow permission for operations: Read from hosts: *, 
> User:6 has Allow permission for operations: Read from hosts: *, User:37 has 
> Allow permission for operations: Read from hosts: *, User:23 has Allow 
> permission for operations: Read from hosts: *, User:19 has Allow permission 
> for operations: Read from hosts: *, User:24 has Allow permission for 
> operations: Read from hosts: *, User:17 has Allow permission for operations: 
> Read from hosts: *, User:34 has Allow permission for operations: Read from 
> hosts: *, User:12 has Allow permission for operations: Read from hosts: *, 
> User:42 has Allow permission for operations: Read from hosts: *, User:4 has 
> Allow permission for operations: Read from hosts: *, User:47 has Allow 
> permission for operations: Read from hosts: *, User:18 has Allow permission 
> for operations: Read from hosts: *, User:31 has Allow permission for 
> operations: Read from hosts: *, User:49 has Allow permission for operations: 
> Read from hosts: *, User:33 has Allow permission for operations: Read from 
> hosts: *, User:1 has Allow permission for operations: Read from hosts: *, 
> User:27 has Allow permission for operations: Read from hosts: *) but got 
> Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 
> has Allow permission for operations: Read from hosts: *, User:21 has Allow 
> permission for operations: Read from hosts: *, User:39 has Allow permission 
> for operations: Read from hosts: *, User:43 has Allow permission for 
> operations: Read from hosts: *, User:3 has Allow permission for operations: 
> Read from hosts: *, User:35 has Allow permission for operations: Read from 
> hosts: *, User:15 has Allow permission for operations: Read from hosts: *, 
> User:16 has Allow permission for operations: Read from hosts: *, User:22 has 
> Allow permission for operations: Read from hosts: *, User:26 has Allow 
> permission for operations: Read from hosts: *, User:11 has Allow permission 
> for operations: Read from hosts: *, 

[jira] [Updated] (KAFKA-6355) transient failure in org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies

2017-12-13 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6355:
---
Component/s: unit tests

> transient failure in 
> org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies
> --
>
> Key: KAFKA-6355
> URL: https://issues.apache.org/jira/browse/KAFKA-6355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: huxihx
>
> Got transient failure during running 
> 'org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies'
> Error Message
> java.lang.AssertionError: Condition not met within timeout 3. Did not 
> receive all 20 records from topic singlePartitionOutputTopic
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Did not 
> receive all 20 records from topic singlePartitionOutputTopic
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:195)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:165)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.runSimpleCopyTest(EosIntegrationTest.java:183)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies(EosIntegrationTest.java:135)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy1.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108)
>   at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 

[jira] [Commented] (KAFKA-6336) when using assign() with kafka consumer the KafkaConsumerGroup command doesnt show those consumers

2017-12-13 Thread Neerja Khattar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289584#comment-16289584
 ] 

Neerja Khattar commented on KAFKA-6336:
---

[~huxi_2b] [~hachikuji] any update?

> when using assign() with kafka consumer the KafkaConsumerGroup command doesnt 
> show those consumers
> --
>
> Key: KAFKA-6336
> URL: https://issues.apache.org/jira/browse/KAFKA-6336
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neerja Khattar
>
> The issue is when using assign rather than subscribe for kafka consumers 
> commit not able to get the lag using ConsumerGroup command. It doesnt even 
> list those groups.
> JMX tool also doesnt show lag properly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently

2017-12-13 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289550#comment-16289550
 ] 

Ted Yu commented on KAFKA-6335:
---

SimpleAclAuthorizer#updateResourceAcls() returns boolean, indicating whether 
the update succeeds or not.
However, the return value is not checked by 
testHighConcurrencyModificationOfResourceAcls().

In highly contended scenario, the test should expect few of the Acl request not 
going thru.

> SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails 
> intermittently
> --
>
> Key: KAFKA-6335
> URL: https://issues.apache.org/jira/browse/KAFKA-6335
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
> Fix For: 1.1.0
>
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/
>  :
> {code}
> java.lang.AssertionError: expected acls Set(User:36 has Allow permission for 
> operations: Read from hosts: *, User:7 has Allow permission for operations: 
> Read from hosts: *, User:21 has Allow permission for operations: Read from 
> hosts: *, User:39 has Allow permission for operations: Read from hosts: *, 
> User:43 has Allow permission for operations: Read from hosts: *, User:3 has 
> Allow permission for operations: Read from hosts: *, User:35 has Allow 
> permission for operations: Read from hosts: *, User:15 has Allow permission 
> for operations: Read from hosts: *, User:16 has Allow permission for 
> operations: Read from hosts: *, User:22 has Allow permission for operations: 
> Read from hosts: *, User:26 has Allow permission for operations: Read from 
> hosts: *, User:11 has Allow permission for operations: Read from hosts: *, 
> User:38 has Allow permission for operations: Read from hosts: *, User:8 has 
> Allow permission for operations: Read from hosts: *, User:28 has Allow 
> permission for operations: Read from hosts: *, User:32 has Allow permission 
> for operations: Read from hosts: *, User:25 has Allow permission for 
> operations: Read from hosts: *, User:41 has Allow permission for operations: 
> Read from hosts: *, User:44 has Allow permission for operations: Read from 
> hosts: *, User:48 has Allow permission for operations: Read from hosts: *, 
> User:2 has Allow permission for operations: Read from hosts: *, User:9 has 
> Allow permission for operations: Read from hosts: *, User:14 has Allow 
> permission for operations: Read from hosts: *, User:46 has Allow permission 
> for operations: Read from hosts: *, User:13 has Allow permission for 
> operations: Read from hosts: *, User:5 has Allow permission for operations: 
> Read from hosts: *, User:29 has Allow permission for operations: Read from 
> hosts: *, User:45 has Allow permission for operations: Read from hosts: *, 
> User:6 has Allow permission for operations: Read from hosts: *, User:37 has 
> Allow permission for operations: Read from hosts: *, User:23 has Allow 
> permission for operations: Read from hosts: *, User:19 has Allow permission 
> for operations: Read from hosts: *, User:24 has Allow permission for 
> operations: Read from hosts: *, User:17 has Allow permission for operations: 
> Read from hosts: *, User:34 has Allow permission for operations: Read from 
> hosts: *, User:12 has Allow permission for operations: Read from hosts: *, 
> User:42 has Allow permission for operations: Read from hosts: *, User:4 has 
> Allow permission for operations: Read from hosts: *, User:47 has Allow 
> permission for operations: Read from hosts: *, User:18 has Allow permission 
> for operations: Read from hosts: *, User:31 has Allow permission for 
> operations: Read from hosts: *, User:49 has Allow permission for operations: 
> Read from hosts: *, User:33 has Allow permission for operations: Read from 
> hosts: *, User:1 has Allow permission for operations: Read from hosts: *, 
> User:27 has Allow permission for operations: Read from hosts: *) but got 
> Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 
> has Allow permission for operations: Read from hosts: *, User:21 has Allow 
> permission for operations: Read from hosts: *, User:39 has Allow permission 
> for operations: Read from hosts: *, User:43 has Allow permission for 
> operations: Read from hosts: *, User:3 has Allow permission for operations: 
> Read from hosts: *, User:35 has Allow permission for operations: Read from 
> hosts: *, User:15 has Allow permission for operations: Read from hosts: *, 
> User:16 has Allow permission for operations: Read from hosts: *, User:22 has 
> Allow permission for operations: Read from hosts: *, User:26 has Allow 
> permission for operations: Read from hosts: *, 

[jira] [Updated] (KAFKA-6086) Provide for custom error handling when Kafka Streams fails to produce

2017-12-13 Thread Matt Farmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Farmer updated KAFKA-6086:
---
Summary: Provide for custom error handling when Kafka Streams fails to 
produce  (was: KIP-210 Provide for custom error handling when Kafka Streams 
fails to produce)

> Provide for custom error handling when Kafka Streams fails to produce
> -
>
> Key: KAFKA-6086
> URL: https://issues.apache.org/jira/browse/KAFKA-6086
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Matt Farmer
>  Labels: kip
>
> This is an issue related to the following KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6086) Provide for custom error handling when Kafka Streams fails to produce

2017-12-13 Thread Matt Farmer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289370#comment-16289370
 ] 

Matt Farmer commented on KAFKA-6086:


The associated KIP has been accepted, the PR is ready for further 
review/merging.

> Provide for custom error handling when Kafka Streams fails to produce
> -
>
> Key: KAFKA-6086
> URL: https://issues.apache.org/jira/browse/KAFKA-6086
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Matt Farmer
>  Labels: kip
>
> This is an issue related to the following KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-4125) Provide low-level Processor API meta data in DSL layer

2017-12-13 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4125:
-

Assignee: (was: Jeyhun Karimov)

> Provide low-level Processor API meta data in DSL layer
> --
>
> Key: KAFKA-4125
> URL: https://issues.apache.org/jira/browse/KAFKA-4125
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: kip
> Fix For: 1.1.0
>
>
> For Processor API, user can get meta data like record offset, timestamp etc 
> via the provided {{Context}} object. It might be useful to allow uses to 
> access this information in DSL layer, too.
> The idea would be, to do it "the Flink way", ie, by providing
> RichFunctions; {{mapValue()}} for example.
> Is takes a {{ValueMapper}} that only has method
> {noformat}
> V2 apply(V1 value);
> {noformat}
> Thus, you cannot get any meta data within apply (it's completely "blind").
> We would add two more interfaces: {{RichFunction}} with a method
> {{open(Context context)}} and
> {noformat}
> RichValueMapper extends ValueMapper, RichFunction
> {noformat}
> This way, the user can chose to implement Rich- or Standard-function and
> we do not need to change existing APIs. Both can be handed into
> {{KStream.mapValues()}} for example. Internally, we check if a Rich
> function is provided, and if yes, hand in the {{Context}} object once, to
> make it available to the user who can now access it within {{apply()}} -- or
> course, the user must set a member variable in {{open()}} to hold the
> reference to the Context object.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic

2017-12-13 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-6035:
-

Assignee: (was: Jeyhun Karimov)

> Avoid creating changelog topics for state stores that are directly piped to a 
> sink topic
> 
>
> Key: KAFKA-6035
> URL: https://issues.apache.org/jira/browse/KAFKA-6035
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. This ticket summarize a specific issue that can be optimized:
> Consider the case when a KTable is materialized and then sent directly into a 
> sink topic with the same key, e.g.
> {code}
> table1 = stream.groupBy(...).aggregate("state1").to("topic2");
> {code}
> Then we do not need to create a {{state1-changelog}} but can just use 
> {{topic2}} as its changelog.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5581) Avoid creating changelog topics for state stores that are materialized from a source topic

2017-12-13 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-5581:
-

Assignee: (was: Jeyhun Karimov)

> Avoid creating changelog topics for state stores that are materialized from a 
> source topic
> --
>
> Key: KAFKA-5581
> URL: https://issues.apache.org/jira/browse/KAFKA-5581
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture, performance
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. A few examples:
> There are a few places where the materialized store do not need a separate 
> changelog topic. This issue summarize a specific issue:
> 1) If a KTable is read directly from a source topic, and is materialized i.e. 
> {code}
> table1 = builder.table("topic1", "store1")`.
> {code}
> In this case {{table1}}'s changelog topic can just be {{topic1}}, and we do 
> not need to create a separate {{table1-changelog}} topic.
> 2) if a KStream is materialized for joins where the streams are directly from 
> a topic, e.g.:
> {code}
> stream1 = builder.stream("topic1");
> stream2 = builder.stream("topic2");
> stream3 = stream1.join(stream2, windows);  // stream1 and stream2 are 
> materialized with a changelog topic
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-12-13 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4304:
-

Assignee: (was: Jeyhun Karimov)

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail

2017-12-13 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-6360:
-

 Summary: RocksDB segments not removed when store is closed causes 
re-initialization to fail
 Key: KAFKA-6360
 URL: https://issues.apache.org/jira/browse/KAFKA-6360
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Damian Guy
Assignee: Damian Guy
Priority: Blocker
 Fix For: 1.1.0


When a store is re-initialized it is first closed, before it is opened again. 
When this happens the segments in the {{Segments}} class are closed, but they 
are not removed from the list of segments. So when the store is re-initialized 
the old closed segments are used. This results in:
{code}
[2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] task 
[1_3] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-24:  
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)
org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed
at 
org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
at 
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
at 
org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
at 
org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
at 
org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
at 
org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
at 
org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
at 
org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
at 
org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6359) Work for KIP-236

2017-12-13 Thread Tom Bentley (JIRA)
Tom Bentley created KAFKA-6359:
--

 Summary: Work for KIP-236
 Key: KAFKA-6359
 URL: https://issues.apache.org/jira/browse/KAFKA-6359
 Project: Kafka
  Issue Type: Improvement
Reporter: Tom Bentley
Assignee: Tom Bentley
Priority: Minor


This issue is for the work described in KIP-236.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6342) Move workaround for JSON parsing of non-escaped strings

2017-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289131#comment-16289131
 ] 

ASF GitHub Bot commented on KAFKA-6342:
---

GitHub user umesh9794 opened a pull request:

https://github.com/apache/kafka/pull/4321

KAFKA-6342 : Move workaround for JSON parsing of non-escaped strings

This PR moves the JSON parsing workaround of [this 
PR](https://github.com/apache/kafka/pull/4303) to new method and uses this 
method in `ZkClient` etc. classes.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/umesh9794/kafka KAFKA-6342

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4321.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4321


commit ec8abab0629ecdea4a3a5970653e4c88025b8dfd
Author: umesh chaudhary 
Date:   2017-12-13T10:51:23Z

Initial Commit




> Move workaround for JSON parsing of non-escaped strings
> ---
>
> Key: KAFKA-6342
> URL: https://issues.apache.org/jira/browse/KAFKA-6342
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Umesh Chaudhary
> Fix For: 1.1.0
>
>
> KAFKA-6319 added a workaround to parse invalid JSON persisted using older 
> versions of Kafka because special characters were not escaped. The workaround 
> is required in 1.0.1 to enable parsing invalid JSON from ACL configs in 
> ZooKeeper. We can move the workaround out of kafka.utils.Json#parseFull for 
> 1.1.0 so that it is applied only to ACLs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6358) Per topic producer/fetch_consumer/fetch_follower metrics

2017-12-13 Thread Ricardo Bartolome (JIRA)
Ricardo Bartolome created KAFKA-6358:


 Summary: Per topic producer/fetch_consumer/fetch_follower metrics
 Key: KAFKA-6358
 URL: https://issues.apache.org/jira/browse/KAFKA-6358
 Project: Kafka
  Issue Type: Wish
  Components: metrics
Affects Versions: 1.0.0
Reporter: Ricardo Bartolome
Priority: Minor


We are using the following JMX beans to monitor Kafka 1.0.0:

{code}
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
Mean
50thPercentile
...
99thPercentile

kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer
Count

kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower
Count
{code}

There are more, but this provide an idea of what we are using in order to get 
produce/fetch operations on a per-broker basis. Nevertheless, in order to 
identify abusing consumers/clients in our kafka cluster, we would appreciate to 
have these metrics in a per-topic basis.

As example of per-topic metrics we have:
{code}
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=.*
{code}

Where we have a per-topic bean with a "Count" attribute that we can query. That 
way we can know which topics are ingesting more data and which ones less data. 
We can't do that with the metric explained above.

Would you consider a change in an upcoming Kafka version as a feature request?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6357) Return nonzero code in kafka-consumer-groups.sh tool in case of error

2017-12-13 Thread Rinat Shigapov (JIRA)
Rinat Shigapov created KAFKA-6357:
-

 Summary: Return nonzero code in kafka-consumer-groups.sh tool in 
case of error
 Key: KAFKA-6357
 URL: https://issues.apache.org/jira/browse/KAFKA-6357
 Project: Kafka
  Issue Type: Improvement
  Components: tools
 Environment: kafka_2.12-0.11.0.0
Reporter: Rinat Shigapov


Use case that triggered that issue:

kafka-consumer-groups.sh can reset offset if there is no active consumer in the 
group. Otherwise it just prints error message about this situation and returns 
zero error code.

Expected behaviour: nonzero code should be returned on error. Than proper 
scripting around kafka-consumer-groups.sh would be possible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6356) UnknownTopicOrPartitionException & NotLeaderForPartitionException and log deletion happening with retention bytes kept at -1.

2017-12-13 Thread kaushik srinivas (JIRA)
kaushik srinivas created KAFKA-6356:
---

 Summary: UnknownTopicOrPartitionException & 
NotLeaderForPartitionException and log deletion happening with retention bytes 
kept at -1.
 Key: KAFKA-6356
 URL: https://issues.apache.org/jira/browse/KAFKA-6356
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
 Environment: Cent OS 7.2,
HDD : 2Tb,
CPUs: 56 cores,
RAM : 256GB
Reporter: kaushik srinivas
 Attachments: configs.txt, stderr_b0, stderr_b1, stderr_b2, stdout_b0, 
stdout_b1, stdout_b2, topic_description, topic_offsets

Facing issues in kafka topic with partitions and replication factor of 3.

Config used :
No of partitions : 20
replication factor : 3
No of brokers : 3
Memory for broker : 32GB
Heap for broker : 12GB

Producer is run to produce data for 20 partitions of a single topic.
But observed that partitions for which the leader is one of the 
broker(broker-1), the offsets are never incremented and also we see log file 
with 0MB size in the broker disk.

Seeing below error in the brokers :

error 1:
2017-12-13 07:11:11,191] ERROR [ReplicaFetcherThread-0-2], Error for partition 
[test2,5] to broker 
2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. (kafka.server.ReplicaFetcherThread)

error 2:
[2017-12-11 12:19:41,599] ERROR [ReplicaFetcherThread-0-2], Error for partition 
[test1,13] to broker 
2:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)

Attaching,
1. error and std out files of all the brokers.
2. kafka config used.
3. offsets and topic description.

Retention bytes was kept to -1 and retention period 96 hours.
But still observing some of the log files deleting at the broker,

from logs :
[2017-12-11 12:20:20,586] INFO Deleting index 
/var/lib/mesos/slave/slaves/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-S7/frameworks/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-0006/executors/ckafka__5f085d0c-e296-40f0-a686-8953dd14e4c6/runs/506a1ce7-23d1-45ea-bb7c-84e015405285/kafka-broker-data/broker-1/test1-12/.timeindex
 (kafka.log.TimeIndex)
[2017-12-11 12:20:20,587] INFO Deleted log for partition [test1,12] in 
/var/lib/mesos/slave/slaves/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-S7/frameworks/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-0006/executors/ckafka__5f085d0c-e296-40f0-a686-8953dd14e4c6/runs/506a1ce7-23d1-45ea-bb7c-84e015405285/kafka-broker-data/broker-1/test1-12.
 (kafka.log.LogManager)

We are expecting the logs to be never delete if retention bytes set to -1.






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)