[jira] [Commented] (KAFKA-5092) KIP 141 - ProducerRecord Interface Improvements
[ https://issues.apache.org/jira/browse/KAFKA-5092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290367#comment-16290367 ] ASF GitHub Bot commented on KAFKA-5092: --- Github user simplesteph closed the pull request at: https://github.com/apache/kafka/pull/2894 > KIP 141 - ProducerRecord Interface Improvements > --- > > Key: KAFKA-5092 > URL: https://issues.apache.org/jira/browse/KAFKA-5092 > Project: Kafka > Issue Type: Improvement >Reporter: Stephane Maarek > Labels: kip > Fix For: 1.1.0 > > > See KIP here: > https://cwiki.apache.org/confluence/display/KAFKA/KIP+141+-+ProducerRecord+Interface+Improvements -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower
[ https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290268#comment-16290268 ] Jason Gustafson commented on KAFKA-6361: Unclean leader election was disabled. It may not have been a session expiration that caused B to become leader (I supposed this, but it's not clear in the logs and I haven't seen controller logs yet). In any case, when broker B took over, broker A was still in the ISR. Broker B appended the entry as described above and then attempted to shrink the ISR, but it failed to do so because of an invalid cached zk version. Broker A had already become leader at that point. > Fast leader fail over can lead to log divergence between leader and follower > > > Key: KAFKA-6361 > URL: https://issues.apache.org/jira/browse/KAFKA-6361 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson > > We have observed an edge case in the replication failover logic which can > cause a replica to permanently fall out of sync with the leader or, in the > worst case, actually have localized divergence between logs. This occurs in > spite of the improved truncation logic from KIP-101. > Suppose we have brokers A and B. Initially A is the leader in epoch 1. It > appends two batches: one in the range (0, 10) and the other in the range (11, > 20). The first one successfully replicates to B, but the second one does not. > In other words, the logs on the brokers look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > {code} > Broker A then has a zk session expiration and broker B is elected with epoch > 2. It appends a new batch with offsets (11, n) to its local log. So we now > have this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Normally we expect broker A to truncate to offset 11 on becoming the > follower, but before it is able to do so, broker B has its own zk session > expiration and broker A again becomes leader, now with epoch 3. It then > appends a new entry in the range (21, 30). The updated logs look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > 2: offsets: [21, 30], leader epoch: 3 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Now what happens next depends on the last offset of the batch appended in > epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch > request to broker A with epoch 2. Broker A will respond that epoch 2 ends at > offset 21. There are three cases: > 1) n < 20: In this case, broker B will not do any truncation. It will begin > fetching from offset n, which will ultimately cause an out of order offset > error because broker A will return the full batch beginning from offset 11 > which broker B will be unable to append. > 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 > and everything will appear fine though the logs have actually diverged. > 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in > the middle of the batch, it will truncate all the way to offset 10. It can > begin fetching from offset 11 and everything is fine. > The case we have actually seen is the first one. The second one would likely > go unnoticed in practice and everything is fine in the third case. To > workaround the issue, we deleted the active segment on the replica which > allowed it to re-replicate consistently from the leader. > I'm not sure the best solution for this scenario. Maybe if the leader isn't > aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} > instead of using the offset of the next highest epoch. That would cause the > follower to truncate using its high watermark. Or perhaps instead of doing > so, it could send another OffsetForLeaderEpoch request at the next previous > cached epoch and then truncate using that. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290260#comment-16290260 ] Sean Policarpio commented on KAFKA-2260: We also desire optimistic concurrency control for our Kafka application. Understandably, relying on Kafka to manage offsets should be the defacto strategy, but our use case requires more transactionality before allowing Kafka to accept data we produce. Ideally, it would be great if the API could let us fail fast when we attempt to push data that we assume was still sequential. > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower
[ https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290252#comment-16290252 ] Jun Rao commented on KAFKA-6361: Was unclean leader election enabled in this case? When broker B takes over as the new leader because broker A's ZK session is expired, the controller is supposed to also shrink ISR to just {B}. If unclean leader election is disabled, when broker B's ZK session expires, broker A can't take over as the new leader since it's not in ISR. In KIP-101, we didn't solve the problem with log divergence when an unclean leader election occurs (https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Appendix(a):PossibilityforDivergentLogswithLeaderEpochs). Solving that problem requires more thoughts especially with compacted topics when certain leader epochs in the middle could have been fully garbage collected. > Fast leader fail over can lead to log divergence between leader and follower > > > Key: KAFKA-6361 > URL: https://issues.apache.org/jira/browse/KAFKA-6361 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson > > We have observed an edge case in the replication failover logic which can > cause a replica to permanently fall out of sync with the leader or, in the > worst case, actually have localized divergence between logs. This occurs in > spite of the improved truncation logic from KIP-101. > Suppose we have brokers A and B. Initially A is the leader in epoch 1. It > appends two batches: one in the range (0, 10) and the other in the range (11, > 20). The first one successfully replicates to B, but the second one does not. > In other words, the logs on the brokers look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > {code} > Broker A then has a zk session expiration and broker B is elected with epoch > 2. It appends a new batch with offsets (11, n) to its local log. So we now > have this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Normally we expect broker A to truncate to offset 11 on becoming the > follower, but before it is able to do so, broker B has its own zk session > expiration and broker A again becomes leader, now with epoch 3. It then > appends a new entry in the range (21, 30). The updated logs look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > 2: offsets: [21, 30], leader epoch: 3 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Now what happens next depends on the last offset of the batch appended in > epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch > request to broker A with epoch 2. Broker A will respond that epoch 2 ends at > offset 21. There are three cases: > 1) n < 20: In this case, broker B will not do any truncation. It will begin > fetching from offset n, which will ultimately cause an out of order offset > error because broker A will return the full batch beginning from offset 11 > which broker B will be unable to append. > 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 > and everything will appear fine though the logs have actually diverged. > 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in > the middle of the batch, it will truncate all the way to offset 10. It can > begin fetching from offset 11 and everything is fine. > The case we have actually seen is the first one. The second one would likely > go unnoticed in practice and everything is fine in the third case. To > workaround the issue, we deleted the active segment on the replica which > allowed it to re-replicate consistently from the leader. > I'm not sure the best solution for this scenario. Maybe if the leader isn't > aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} > instead of using the offset of the next highest epoch. That would cause the > follower to truncate using its high watermark. Or perhaps instead of doing > so, it could send another OffsetForLeaderEpoch request at the next previous > cached epoch and then truncate using that. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290245#comment-16290245 ] Jed Wesley-Smith commented on KAFKA-2260: - This would be a valuable feature, currently we need something else to coordinate monotonic writes (for an event source), and Kafka cannot support this directly. > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5849) Add process stop faults, round trip workload, partitioned produce-consume test
[ https://issues.apache.org/jira/browse/KAFKA-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290205#comment-16290205 ] ASF GitHub Bot commented on KAFKA-5849: --- GitHub user cmccabe opened a pull request: https://github.com/apache/kafka/pull/4323 KAFKA-5849: Add process stop, round trip workload, partitioned test * Implement process stop faults via SIGSTOP / SIGCONT * Implement RoundTripWorkload, which both sends messages, and confirms that they are received at least once. * Allow Trogdor tasks to block until other Trogdor tasks are complete. * Add CreateTopicsWorker, which can be a building block for a lot of tests. * Simplify how TaskSpec subclasses in ducktape serialize themselves to JSON. * Implement some fault injection tests in round_trip_workload_test.py You can merge this pull request into a Git repository by running: $ git pull https://github.com/cmccabe/kafka KAFKA-5849 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4323.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4323 commit bd7c5ae285bcc62e6ee526de55343679e1e1 Author: Colin P. MccabeDate: 2017-12-14T01:31:00Z KAFKA-5849: Add process stop faults, round trip workload, partitioned produce-consume test * Implement process stop faults via SIGSTOP / SIGCONT * Implement RoundTripWorkload, which both sends messages, and confirms that they are received at least once. * Allow Trogdor tasks to block until other Trogdor tasks are complete. * Add CreateTopicsWorker, which can be a building block for a lot of tests. * Simplify how TaskSpec subclasses in ducktape serialize themselves to JSON. * Implement some fault injection tests in round_trip_workload_test.py > Add process stop faults, round trip workload, partitioned produce-consume test > -- > > Key: KAFKA-5849 > URL: https://issues.apache.org/jira/browse/KAFKA-5849 > Project: Kafka > Issue Type: Sub-task >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe > > Add partitioned produce consume test -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5849) Add process stop faults, round trip workload, partitioned produce-consume test
[ https://issues.apache.org/jira/browse/KAFKA-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290197#comment-16290197 ] ASF GitHub Bot commented on KAFKA-5849: --- Github user cmccabe closed the pull request at: https://github.com/apache/kafka/pull/4106 > Add process stop faults, round trip workload, partitioned produce-consume test > -- > > Key: KAFKA-5849 > URL: https://issues.apache.org/jira/browse/KAFKA-5849 > Project: Kafka > Issue Type: Sub-task >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe > > Add partitioned produce consume test -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5849) Add process stop faults, round trip workload, partitioned produce-consume test
[ https://issues.apache.org/jira/browse/KAFKA-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin P. McCabe updated KAFKA-5849: --- Summary: Add process stop faults, round trip workload, partitioned produce-consume test (was: Add partitioned produce consume test) > Add process stop faults, round trip workload, partitioned produce-consume test > -- > > Key: KAFKA-5849 > URL: https://issues.apache.org/jira/browse/KAFKA-5849 > Project: Kafka > Issue Type: Sub-task >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe > > Add partitioned produce consume test -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6361) Fast leader fail over can lead to log divergence between replica and follower
Jason Gustafson created KAFKA-6361: -- Summary: Fast leader fail over can lead to log divergence between replica and follower Key: KAFKA-6361 URL: https://issues.apache.org/jira/browse/KAFKA-6361 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson We have observed an edge case in the replication failover logic which can cause a replica to permanently fall out of sync with the leader or, in the worst case, actually have localized divergence between logs. This occurs in spite of the improved truncation logic from KIP-101. Suppose we have brokers A and B. Initially A is the leader in epoch 1. It appends two batches: one in the range (0, 10) and the other in the range (11, 20). The first one successfully replicates to B, but the second one does not. In other words, the logs on the brokers look like this: {code} Broker A: 0: offsets [0, 10], leader epoch: 1 1: offsets [11, 20], leader epoch: 1 Broker B: 0: offsets [0, 10], leader epoch: 1 {code} Broker A then has a zk session expiration and broker B is elected with epoch 2. It appends a new batch with offsets (11, n) to its local log. So we now have this: {code} Broker A: 0: offsets [0, 10], leader epoch: 1 1: offsets [11, 20], leader epoch: 1 Broker B: 0: offsets [0, 10], leader epoch: 1 1: offsets: [11, n], leader epoch: 2 {code} Normally we expect broker A to truncate to offset 11 on becoming the follower, but before it is able to do so, broker B has its own zk session expiration and broker A again becomes leader, now with epoch 3. It then appends a new entry in the range (21, 30). The updated logs look like this: {code} Broker A: 0: offsets [0, 10], leader epoch: 1 1: offsets [11, 20], leader epoch: 1 2: offsets: [21, 30], leader epoch: 3 Broker B: 0: offsets [0, 10], leader epoch: 1 1: offsets: [11, n], leader epoch: 2 {code} Now what happens next depends on the last offset of the batch appended in epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch request to broker A with epoch 2. Broker A will respond that epoch 2 ends at offset 21. There are three cases: 1) n < 20: In this case, broker B will not do any truncation. It will begin fetching from offset n, which will ultimately cause an out of order offset error because broker A will return the full batch beginning from offset 11 which broker B will be unable to append. 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 and everything will appear fine though the logs have actually diverged. 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in the middle of the batch, it will truncate all the way to offset 10. It can begin fetching from offset 11 and everything is fine. The case we have actually seen is the first one. The second one would likely go unnoticed in practice and everything is fine in the third case. To workaround the issue, we deleted the active segment on the replica which allowed it to re-replicate consistently from the leader. I'm not sure the best solution for this scenario. Maybe if the leader isn't aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} instead of using the offset of the next highest epoch. That would cause the follower to truncate using its high watermark. Or perhaps instead of doing so, it could send another OffsetForLeaderEpoch request at the next previous cached epoch and then truncate using that. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read
[ https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290140#comment-16290140 ] huxihx commented on KAFKA-6345: --- A easy-thinking solution is to create a safe count method only for JMX metrics. The safe version creates a live snapshot for the map by deep copying each map entries. > NetworkClient.inFlightRequestCount() is not thread safe, causing > ConcurrentModificationExceptions when sensors are read > --- > > Key: KAFKA-6345 > URL: https://issues.apache.org/jira/browse/KAFKA-6345 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 >Reporter: radai rosenblatt > > example stack trace (code is ~0.10.2.*) > {code} > java.util.ConcurrentModificationException: > java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) > at java.util.HashMap$ValueIterator.next(HashMap.java:1458) > at > org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109) > at > org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382) > at > org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705) > {code} > looking at latest trunk, the code is still vulnerable: > # NetworkClient.inFlightRequestCount() eventually iterates over > InFlightRequests.requests.values(), which is backed by a (non-thread-safe) > HashMap > # this will be called from the "requests-in-flight" sensor's measure() method > (Sender.java line ~765 in SenderMetrics ctr), which would be driven by some > thread reading JMX values > # HashMap in question would also be updated by some client io thread calling > NetworkClient.doSend() - which calls into InFlightRequests.add()) > i guess the only upside is that this exception will always happen on the > thread reading the JMX values and never on the actual client io thread ... -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
[ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290110#comment-16290110 ] William Austin commented on KAFKA-6252: --- [~rhauch] are there any workarounds for this issue until it is changed? > A metric named 'XX' already exists, can't register another one. > --- > > Key: KAFKA-6252 > URL: https://issues.apache.org/jira/browse/KAFKA-6252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Alexis Sellier >Priority: Critical > > When a connector crashes (or is not implemented correctly by not > stopping/interrupting {{poll()}}), It cannot be restarted and an exception > like this is thrown > {code:java} > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=offset-commit-max-time-ms, group=connector-task-metrics, > description=The maximum time in milliseconds taken by this task to commit > offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already > exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241) > at > org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328) > at > org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98) > at > org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > I guess it's because the function taskMetricsGroup.close is not call in all > the cases -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6339) Integration test with embedded kafka not working
[ https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290024#comment-16290024 ] Narendra Kumar edited comment on KAFKA-6339 at 12/13/17 10:13 PM: -- Hi Dhruv, Can you please enable debug logging and provide some more amount of logs? May be logs since start of test. was (Author: narendra kumar): Hi Dhruv, Can you please enable debug logging and provide some more amount of logs? > Integration test with embedded kafka not working > > > Key: KAFKA-6339 > URL: https://issues.apache.org/jira/browse/KAFKA-6339 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.2 >Reporter: DHRUV BANSAL > > I am using Kafka version - 0.11.0.2 > Trying to write an integration test for one of the components I am writing > over Kafka. > Following code works fine with Kafka version 0.10.0.0 but not working with > mentioned version (0.11.0.2) > // setup Zookeeper > {code:java} > String ZKHOST = "127.0.0.1"; > String BROKERHOST = "127.0.0.1"; > String BROKERPORT = "9093"; > String TOPIC = "test1"; > EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(); > String zkConnect = ZKHOST + ":" + embeddedZookeeper.port(); > ZkClient zkClient = new ZkClient(zkConnect, 3, 3, > ZKStringSerializer$.MODULE$); > ZkUtils zkUtils = ZkUtils.apply(zkClient, false); > // setup Broker > Properties brokerProps = new Properties(); > brokerProps.setProperty("zookeeper.connect", zkConnect); > brokerProps.setProperty("broker.id", "0"); > brokerProps.setProperty("offsets.topic.replication.factor", > "1"); > String kafka_log_path = > Files.createTempDirectory("kafka-").toAbsolutePath().toString(); > System.out.println("kafka log path " + kafka_log_path); > brokerProps.setProperty("log.dirs", kafka_log_path); > brokerProps.setProperty("listeners", "PLAINTEXT://" + > BROKERHOST + ":" + BROKERPORT); > KafkaConfig config = new KafkaConfig(brokerProps); > Time mock = new MockTime(); > KafkaServer kafkaServer = TestUtils.createServer(config, mock); > // create topic > AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), > RackAwareMode.Disabled$.MODULE$); > // setup producer > Properties producerProps = new Properties(); > producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" > + BROKERPORT); > producerProps.setProperty("key.serializer", > "org.apache.kafka.common.serialization.IntegerSerializer"); > producerProps.setProperty("value.serializer", > "org.apache.kafka.common.serialization.ByteArraySerializer"); > KafkaProducerproducer = new > KafkaProducer (producerProps); > // setup consumer > Properties consumerProps = new Properties(); > consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" > + BROKERPORT); > consumerProps.setProperty("group.id", "group0"); > consumerProps.setProperty("client.id", "consumer0"); > consumerProps.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.IntegerDeserializer"); > consumerProps.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > consumerProps.put("auto.offset.reset", "earliest"); // to make > sure the consumer starts from the beginning of > > // the topic > KafkaConsumer consumer = new > KafkaConsumer<>(consumerProps); > consumer.subscribe(Arrays.asList(TOPIC)); > // send message > ProducerRecord data = new > ProducerRecord<>(TOPIC, 42, > > "test-message".getBytes(StandardCharsets.UTF_8)); > Future record = producer.send(data); > RecordMetadata metadata = record.get(); > // starting consumer > ConsumerRecords records = consumer.poll(3000); > assertEquals(1, records.count()); > Iterator > recordIterator = > records.iterator(); > ConsumerRecord consumedRecord = > recordIterator.next(); > System.out.printf("offset = %d, key = %s, value = %s", > consumedRecord.offset(),
[jira] [Commented] (KAFKA-6339) Integration test with embedded kafka not working
[ https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290024#comment-16290024 ] Narendra Kumar commented on KAFKA-6339: --- Hi Dhruv, Can you please enable debug logging and provide some more amount of logs? > Integration test with embedded kafka not working > > > Key: KAFKA-6339 > URL: https://issues.apache.org/jira/browse/KAFKA-6339 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.2 >Reporter: DHRUV BANSAL > > I am using Kafka version - 0.11.0.2 > Trying to write an integration test for one of the components I am writing > over Kafka. > Following code works fine with Kafka version 0.10.0.0 but not working with > mentioned version (0.11.0.2) > // setup Zookeeper > {code:java} > String ZKHOST = "127.0.0.1"; > String BROKERHOST = "127.0.0.1"; > String BROKERPORT = "9093"; > String TOPIC = "test1"; > EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(); > String zkConnect = ZKHOST + ":" + embeddedZookeeper.port(); > ZkClient zkClient = new ZkClient(zkConnect, 3, 3, > ZKStringSerializer$.MODULE$); > ZkUtils zkUtils = ZkUtils.apply(zkClient, false); > // setup Broker > Properties brokerProps = new Properties(); > brokerProps.setProperty("zookeeper.connect", zkConnect); > brokerProps.setProperty("broker.id", "0"); > brokerProps.setProperty("offsets.topic.replication.factor", > "1"); > String kafka_log_path = > Files.createTempDirectory("kafka-").toAbsolutePath().toString(); > System.out.println("kafka log path " + kafka_log_path); > brokerProps.setProperty("log.dirs", kafka_log_path); > brokerProps.setProperty("listeners", "PLAINTEXT://" + > BROKERHOST + ":" + BROKERPORT); > KafkaConfig config = new KafkaConfig(brokerProps); > Time mock = new MockTime(); > KafkaServer kafkaServer = TestUtils.createServer(config, mock); > // create topic > AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), > RackAwareMode.Disabled$.MODULE$); > // setup producer > Properties producerProps = new Properties(); > producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" > + BROKERPORT); > producerProps.setProperty("key.serializer", > "org.apache.kafka.common.serialization.IntegerSerializer"); > producerProps.setProperty("value.serializer", > "org.apache.kafka.common.serialization.ByteArraySerializer"); > KafkaProducerproducer = new > KafkaProducer (producerProps); > // setup consumer > Properties consumerProps = new Properties(); > consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" > + BROKERPORT); > consumerProps.setProperty("group.id", "group0"); > consumerProps.setProperty("client.id", "consumer0"); > consumerProps.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.IntegerDeserializer"); > consumerProps.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > consumerProps.put("auto.offset.reset", "earliest"); // to make > sure the consumer starts from the beginning of > > // the topic > KafkaConsumer consumer = new > KafkaConsumer<>(consumerProps); > consumer.subscribe(Arrays.asList(TOPIC)); > // send message > ProducerRecord data = new > ProducerRecord<>(TOPIC, 42, > > "test-message".getBytes(StandardCharsets.UTF_8)); > Future record = producer.send(data); > RecordMetadata metadata = record.get(); > // starting consumer > ConsumerRecords records = consumer.poll(3000); > assertEquals(1, records.count()); > Iterator > recordIterator = > records.iterator(); > ConsumerRecord consumedRecord = > recordIterator.next(); > System.out.printf("offset = %d, key = %s, value = %s", > consumedRecord.offset(), consumedRecord.key(), > consumedRecord.value()); > assertEquals(42, (int) consumedRecord.key()); > assertEquals("test-message", new
[jira] [Assigned] (KAFKA-6352) Delay message down-conversion until response is ready to be sent
[ https://issues.apache.org/jira/browse/KAFKA-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-6352: -- Assignee: Jason Gustafson > Delay message down-conversion until response is ready to be sent > > > Key: KAFKA-6352 > URL: https://issues.apache.org/jira/browse/KAFKA-6352 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson > > We have observed some users beginning to use the new message format before > their clients have been upgraded. As we know, this can cause a lot of memory > pressure due to the fact that we down-convert the full response in memory. > Currently we do this down-conversion prior to enqueuing the response to be > sent by the Processor, which may cause more heap utilization than needed with > a steady queue size. > A possible improvement is to delay the down-conversion until the response is > actually ready to be sent. Even better would be to do the down-conversion in > a streaming fashion, converting only as much as is needed at any time. One > potential drawback is that this moves the down-conversion into the network > threads. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6126) Reduce rebalance time by not checking if created topics are available
[ https://issues.apache.org/jira/browse/KAFKA-6126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289929#comment-16289929 ] ASF GitHub Bot commented on KAFKA-6126: --- GitHub user mjsax opened a pull request: https://github.com/apache/kafka/pull/4322 KAFKA-6126: Remove unnecessary topics created check ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/kafka kafka-6126-remove-topic-check-on-rebalance-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4322.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4322 commit c0a8b4868cf979e47a25d5e837b26e62e2aac212 Author: Matthias J. SaxDate: 2017-12-13T21:06:18Z KAFKA-6126: Remove unnecessary topics created check > Reduce rebalance time by not checking if created topics are available > - > > Key: KAFKA-6126 > URL: https://issues.apache.org/jira/browse/KAFKA-6126 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > Fix For: 1.1.0 > > > Within {{StreamPartitionAssignor#assign}} we create new topics and afterwards > wait in an "infinite loop" until topic metadata propagated throughout the > cluster. We do this, to make sure topics are available when we start > processing. > However, with this approach we "extend" the time in the rebalance phase and > thus are not responsive (no calls to `poll` for liveness check and > {{KafkaStreams#close}} suffers). Thus, we might want to remove this check and > handle potential "topic not found" exceptions in the main thread gracefully. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6018) Make KafkaFuture.Function java 8 lambda compatible
[ https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289883#comment-16289883 ] ASF GitHub Bot commented on KAFKA-6018: --- Github user xvrl closed the pull request at: https://github.com/apache/kafka/pull/4308 > Make KafkaFuture.Function java 8 lambda compatible > -- > > Key: KAFKA-6018 > URL: https://issues.apache.org/jira/browse/KAFKA-6018 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Steven Aerts > > KafkaFuture.Function is currently an empty public abstract class. > This means you cannot implement them as a java lambda. And you end up with > constructs as: > {code:java} > new KafkaFuture.Function() { > @Override > public Object apply(Set strings) { > return foo; > } > } > {code} > I propose to define them as interfaces. > So this code can become in java 8: > {code:java} > strings -> foo > {code} > I know this change is backwards incompatible (extends becomes implements). > But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}. > And KafkaFuture states in its javadoc: > {quote}This will eventually become a thin shim on top of Java 8's > CompletableFuture.{quote} > I think this change might be worth considering. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently
[ https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289773#comment-16289773 ] Manikumar commented on KAFKA-6335: -- SimpleAclAuthorizer#updateResourceAcls() will retry incase of any update failure/version mismatch. test case configures Int.Max number of retries to avoid any transient errors. test failures are always missing first acl in the expected list. looks like a synchronization issue/corner case. Will look into this later this week. > SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails > intermittently > -- > > Key: KAFKA-6335 > URL: https://issues.apache.org/jira/browse/KAFKA-6335 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: Manikumar > Fix For: 1.1.0 > > > From > https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/ > : > {code} > java.lang.AssertionError: expected acls Set(User:36 has Allow permission for > operations: Read from hosts: *, User:7 has Allow permission for operations: > Read from hosts: *, User:21 has Allow permission for operations: Read from > hosts: *, User:39 has Allow permission for operations: Read from hosts: *, > User:43 has Allow permission for operations: Read from hosts: *, User:3 has > Allow permission for operations: Read from hosts: *, User:35 has Allow > permission for operations: Read from hosts: *, User:15 has Allow permission > for operations: Read from hosts: *, User:16 has Allow permission for > operations: Read from hosts: *, User:22 has Allow permission for operations: > Read from hosts: *, User:26 has Allow permission for operations: Read from > hosts: *, User:11 has Allow permission for operations: Read from hosts: *, > User:38 has Allow permission for operations: Read from hosts: *, User:8 has > Allow permission for operations: Read from hosts: *, User:28 has Allow > permission for operations: Read from hosts: *, User:32 has Allow permission > for operations: Read from hosts: *, User:25 has Allow permission for > operations: Read from hosts: *, User:41 has Allow permission for operations: > Read from hosts: *, User:44 has Allow permission for operations: Read from > hosts: *, User:48 has Allow permission for operations: Read from hosts: *, > User:2 has Allow permission for operations: Read from hosts: *, User:9 has > Allow permission for operations: Read from hosts: *, User:14 has Allow > permission for operations: Read from hosts: *, User:46 has Allow permission > for operations: Read from hosts: *, User:13 has Allow permission for > operations: Read from hosts: *, User:5 has Allow permission for operations: > Read from hosts: *, User:29 has Allow permission for operations: Read from > hosts: *, User:45 has Allow permission for operations: Read from hosts: *, > User:6 has Allow permission for operations: Read from hosts: *, User:37 has > Allow permission for operations: Read from hosts: *, User:23 has Allow > permission for operations: Read from hosts: *, User:19 has Allow permission > for operations: Read from hosts: *, User:24 has Allow permission for > operations: Read from hosts: *, User:17 has Allow permission for operations: > Read from hosts: *, User:34 has Allow permission for operations: Read from > hosts: *, User:12 has Allow permission for operations: Read from hosts: *, > User:42 has Allow permission for operations: Read from hosts: *, User:4 has > Allow permission for operations: Read from hosts: *, User:47 has Allow > permission for operations: Read from hosts: *, User:18 has Allow permission > for operations: Read from hosts: *, User:31 has Allow permission for > operations: Read from hosts: *, User:49 has Allow permission for operations: > Read from hosts: *, User:33 has Allow permission for operations: Read from > hosts: *, User:1 has Allow permission for operations: Read from hosts: *, > User:27 has Allow permission for operations: Read from hosts: *) but got > Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 > has Allow permission for operations: Read from hosts: *, User:21 has Allow > permission for operations: Read from hosts: *, User:39 has Allow permission > for operations: Read from hosts: *, User:43 has Allow permission for > operations: Read from hosts: *, User:3 has Allow permission for operations: > Read from hosts: *, User:35 has Allow permission for operations: Read from > hosts: *, User:15 has Allow permission for operations: Read from hosts: *, > User:16 has Allow permission for operations: Read from hosts: *, User:22 has > Allow permission for operations: Read from hosts: *, User:26 has Allow >
[jira] [Updated] (KAFKA-6232) SaslSslAdminClientIntegrationTest sometimes fails
[ https://issues.apache.org/jira/browse/KAFKA-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6232: -- Labels: security (was: ) > SaslSslAdminClientIntegrationTest sometimes fails > - > > Key: KAFKA-6232 > URL: https://issues.apache.org/jira/browse/KAFKA-6232 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu > Labels: security > Attachments: saslSslAdminClientIntegrationTest-203.out > > > Here was one recent occurrence: > https://builds.apache.org/job/kafka-trunk-jdk9/203/testReport/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/ > {code} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.LeaderNotAvailableException: There is no > leader for this topic-partition as we are in the middle of a leadership > election. > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225) > at > kafka.api.AdminClientIntegrationTest.$anonfun$testLogStartOffsetCheckpoint$3(AdminClientIntegrationTest.scala:762) > {code} > In the test output, I saw: > {code} > [2017-11-17 23:15:45,593] ERROR [KafkaApi-1] Error when handling request > {filters=[{resource_type=2,resource_name=foobar,principal=User:ANONYMOUS,host=*,operation=3,permission_type=3},{resource_type=5,resource_name=transactional_id,principal=User:ANONYMOUS,host=*,operation=4,permission_type=3}]} > (kafka.server.KafkaApis:107) > org.apache.kafka.common.errors.ClusterAuthorizationException: Request > Request(processor=2, connectionId=127.0.0.1:36295-127.0.0.1:58183-0, > session=Session(User:client2,localhost/127.0.0.1), > listenerName=ListenerName(SASL_SSL), securityProtocol=SASL_SSL, buffer=null) > is not authorized. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently
[ https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289701#comment-16289701 ] Guozhang Wang commented on KAFKA-6335: -- Encountered this test failure again on Jenkins: https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/9984/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/ > SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails > intermittently > -- > > Key: KAFKA-6335 > URL: https://issues.apache.org/jira/browse/KAFKA-6335 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: Manikumar > Fix For: 1.1.0 > > > From > https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/ > : > {code} > java.lang.AssertionError: expected acls Set(User:36 has Allow permission for > operations: Read from hosts: *, User:7 has Allow permission for operations: > Read from hosts: *, User:21 has Allow permission for operations: Read from > hosts: *, User:39 has Allow permission for operations: Read from hosts: *, > User:43 has Allow permission for operations: Read from hosts: *, User:3 has > Allow permission for operations: Read from hosts: *, User:35 has Allow > permission for operations: Read from hosts: *, User:15 has Allow permission > for operations: Read from hosts: *, User:16 has Allow permission for > operations: Read from hosts: *, User:22 has Allow permission for operations: > Read from hosts: *, User:26 has Allow permission for operations: Read from > hosts: *, User:11 has Allow permission for operations: Read from hosts: *, > User:38 has Allow permission for operations: Read from hosts: *, User:8 has > Allow permission for operations: Read from hosts: *, User:28 has Allow > permission for operations: Read from hosts: *, User:32 has Allow permission > for operations: Read from hosts: *, User:25 has Allow permission for > operations: Read from hosts: *, User:41 has Allow permission for operations: > Read from hosts: *, User:44 has Allow permission for operations: Read from > hosts: *, User:48 has Allow permission for operations: Read from hosts: *, > User:2 has Allow permission for operations: Read from hosts: *, User:9 has > Allow permission for operations: Read from hosts: *, User:14 has Allow > permission for operations: Read from hosts: *, User:46 has Allow permission > for operations: Read from hosts: *, User:13 has Allow permission for > operations: Read from hosts: *, User:5 has Allow permission for operations: > Read from hosts: *, User:29 has Allow permission for operations: Read from > hosts: *, User:45 has Allow permission for operations: Read from hosts: *, > User:6 has Allow permission for operations: Read from hosts: *, User:37 has > Allow permission for operations: Read from hosts: *, User:23 has Allow > permission for operations: Read from hosts: *, User:19 has Allow permission > for operations: Read from hosts: *, User:24 has Allow permission for > operations: Read from hosts: *, User:17 has Allow permission for operations: > Read from hosts: *, User:34 has Allow permission for operations: Read from > hosts: *, User:12 has Allow permission for operations: Read from hosts: *, > User:42 has Allow permission for operations: Read from hosts: *, User:4 has > Allow permission for operations: Read from hosts: *, User:47 has Allow > permission for operations: Read from hosts: *, User:18 has Allow permission > for operations: Read from hosts: *, User:31 has Allow permission for > operations: Read from hosts: *, User:49 has Allow permission for operations: > Read from hosts: *, User:33 has Allow permission for operations: Read from > hosts: *, User:1 has Allow permission for operations: Read from hosts: *, > User:27 has Allow permission for operations: Read from hosts: *) but got > Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 > has Allow permission for operations: Read from hosts: *, User:21 has Allow > permission for operations: Read from hosts: *, User:39 has Allow permission > for operations: Read from hosts: *, User:43 has Allow permission for > operations: Read from hosts: *, User:3 has Allow permission for operations: > Read from hosts: *, User:35 has Allow permission for operations: Read from > hosts: *, User:15 has Allow permission for operations: Read from hosts: *, > User:16 has Allow permission for operations: Read from hosts: *, User:22 has > Allow permission for operations: Read from hosts: *, User:26 has Allow > permission for operations: Read from hosts: *, User:11 has Allow permission > for operations: Read from hosts: *,
[jira] [Updated] (KAFKA-6355) transient failure in org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies
[ https://issues.apache.org/jira/browse/KAFKA-6355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6355: --- Component/s: unit tests > transient failure in > org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies > -- > > Key: KAFKA-6355 > URL: https://issues.apache.org/jira/browse/KAFKA-6355 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: huxihx > > Got transient failure during running > 'org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies' > Error Message > java.lang.AssertionError: Condition not met within timeout 3. Did not > receive all 20 records from topic singlePartitionOutputTopic > Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. Did not > receive all 20 records from topic singlePartitionOutputTopic > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:195) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:165) > at > org.apache.kafka.streams.integration.EosIntegrationTest.runSimpleCopyTest(EosIntegrationTest.java:183) > at > org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies(EosIntegrationTest.java:135) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy1.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108) > at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at
[jira] [Commented] (KAFKA-6336) when using assign() with kafka consumer the KafkaConsumerGroup command doesnt show those consumers
[ https://issues.apache.org/jira/browse/KAFKA-6336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289584#comment-16289584 ] Neerja Khattar commented on KAFKA-6336: --- [~huxi_2b] [~hachikuji] any update? > when using assign() with kafka consumer the KafkaConsumerGroup command doesnt > show those consumers > -- > > Key: KAFKA-6336 > URL: https://issues.apache.org/jira/browse/KAFKA-6336 > Project: Kafka > Issue Type: Bug >Reporter: Neerja Khattar > > The issue is when using assign rather than subscribe for kafka consumers > commit not able to get the lag using ConsumerGroup command. It doesnt even > list those groups. > JMX tool also doesnt show lag properly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently
[ https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289550#comment-16289550 ] Ted Yu commented on KAFKA-6335: --- SimpleAclAuthorizer#updateResourceAcls() returns boolean, indicating whether the update succeeds or not. However, the return value is not checked by testHighConcurrencyModificationOfResourceAcls(). In highly contended scenario, the test should expect few of the Acl request not going thru. > SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails > intermittently > -- > > Key: KAFKA-6335 > URL: https://issues.apache.org/jira/browse/KAFKA-6335 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: Manikumar > Fix For: 1.1.0 > > > From > https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/ > : > {code} > java.lang.AssertionError: expected acls Set(User:36 has Allow permission for > operations: Read from hosts: *, User:7 has Allow permission for operations: > Read from hosts: *, User:21 has Allow permission for operations: Read from > hosts: *, User:39 has Allow permission for operations: Read from hosts: *, > User:43 has Allow permission for operations: Read from hosts: *, User:3 has > Allow permission for operations: Read from hosts: *, User:35 has Allow > permission for operations: Read from hosts: *, User:15 has Allow permission > for operations: Read from hosts: *, User:16 has Allow permission for > operations: Read from hosts: *, User:22 has Allow permission for operations: > Read from hosts: *, User:26 has Allow permission for operations: Read from > hosts: *, User:11 has Allow permission for operations: Read from hosts: *, > User:38 has Allow permission for operations: Read from hosts: *, User:8 has > Allow permission for operations: Read from hosts: *, User:28 has Allow > permission for operations: Read from hosts: *, User:32 has Allow permission > for operations: Read from hosts: *, User:25 has Allow permission for > operations: Read from hosts: *, User:41 has Allow permission for operations: > Read from hosts: *, User:44 has Allow permission for operations: Read from > hosts: *, User:48 has Allow permission for operations: Read from hosts: *, > User:2 has Allow permission for operations: Read from hosts: *, User:9 has > Allow permission for operations: Read from hosts: *, User:14 has Allow > permission for operations: Read from hosts: *, User:46 has Allow permission > for operations: Read from hosts: *, User:13 has Allow permission for > operations: Read from hosts: *, User:5 has Allow permission for operations: > Read from hosts: *, User:29 has Allow permission for operations: Read from > hosts: *, User:45 has Allow permission for operations: Read from hosts: *, > User:6 has Allow permission for operations: Read from hosts: *, User:37 has > Allow permission for operations: Read from hosts: *, User:23 has Allow > permission for operations: Read from hosts: *, User:19 has Allow permission > for operations: Read from hosts: *, User:24 has Allow permission for > operations: Read from hosts: *, User:17 has Allow permission for operations: > Read from hosts: *, User:34 has Allow permission for operations: Read from > hosts: *, User:12 has Allow permission for operations: Read from hosts: *, > User:42 has Allow permission for operations: Read from hosts: *, User:4 has > Allow permission for operations: Read from hosts: *, User:47 has Allow > permission for operations: Read from hosts: *, User:18 has Allow permission > for operations: Read from hosts: *, User:31 has Allow permission for > operations: Read from hosts: *, User:49 has Allow permission for operations: > Read from hosts: *, User:33 has Allow permission for operations: Read from > hosts: *, User:1 has Allow permission for operations: Read from hosts: *, > User:27 has Allow permission for operations: Read from hosts: *) but got > Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 > has Allow permission for operations: Read from hosts: *, User:21 has Allow > permission for operations: Read from hosts: *, User:39 has Allow permission > for operations: Read from hosts: *, User:43 has Allow permission for > operations: Read from hosts: *, User:3 has Allow permission for operations: > Read from hosts: *, User:35 has Allow permission for operations: Read from > hosts: *, User:15 has Allow permission for operations: Read from hosts: *, > User:16 has Allow permission for operations: Read from hosts: *, User:22 has > Allow permission for operations: Read from hosts: *, User:26 has Allow > permission for operations: Read from hosts: *,
[jira] [Updated] (KAFKA-6086) Provide for custom error handling when Kafka Streams fails to produce
[ https://issues.apache.org/jira/browse/KAFKA-6086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Farmer updated KAFKA-6086: --- Summary: Provide for custom error handling when Kafka Streams fails to produce (was: KIP-210 Provide for custom error handling when Kafka Streams fails to produce) > Provide for custom error handling when Kafka Streams fails to produce > - > > Key: KAFKA-6086 > URL: https://issues.apache.org/jira/browse/KAFKA-6086 > Project: Kafka > Issue Type: Improvement >Reporter: Matt Farmer > Labels: kip > > This is an issue related to the following KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6086) Provide for custom error handling when Kafka Streams fails to produce
[ https://issues.apache.org/jira/browse/KAFKA-6086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289370#comment-16289370 ] Matt Farmer commented on KAFKA-6086: The associated KIP has been accepted, the PR is ready for further review/merging. > Provide for custom error handling when Kafka Streams fails to produce > - > > Key: KAFKA-6086 > URL: https://issues.apache.org/jira/browse/KAFKA-6086 > Project: Kafka > Issue Type: Improvement >Reporter: Matt Farmer > Labels: kip > > This is an issue related to the following KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-4125) Provide low-level Processor API meta data in DSL layer
[ https://issues.apache.org/jira/browse/KAFKA-4125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeyhun Karimov reassigned KAFKA-4125: - Assignee: (was: Jeyhun Karimov) > Provide low-level Processor API meta data in DSL layer > -- > > Key: KAFKA-4125 > URL: https://issues.apache.org/jira/browse/KAFKA-4125 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: kip > Fix For: 1.1.0 > > > For Processor API, user can get meta data like record offset, timestamp etc > via the provided {{Context}} object. It might be useful to allow uses to > access this information in DSL layer, too. > The idea would be, to do it "the Flink way", ie, by providing > RichFunctions; {{mapValue()}} for example. > Is takes a {{ValueMapper}} that only has method > {noformat} > V2 apply(V1 value); > {noformat} > Thus, you cannot get any meta data within apply (it's completely "blind"). > We would add two more interfaces: {{RichFunction}} with a method > {{open(Context context)}} and > {noformat} > RichValueMapper extends ValueMapper , RichFunction > {noformat} > This way, the user can chose to implement Rich- or Standard-function and > we do not need to change existing APIs. Both can be handed into > {{KStream.mapValues()}} for example. Internally, we check if a Rich > function is provided, and if yes, hand in the {{Context}} object once, to > make it available to the user who can now access it within {{apply()}} -- or > course, the user must set a member variable in {{open()}} to hold the > reference to the Context object. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic
[ https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeyhun Karimov reassigned KAFKA-6035: - Assignee: (was: Jeyhun Karimov) > Avoid creating changelog topics for state stores that are directly piped to a > sink topic > > > Key: KAFKA-6035 > URL: https://issues.apache.org/jira/browse/KAFKA-6035 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang > > Today Streams make all state stores to be backed by a changelog topic by > default unless users overrides it by {{disableLogging}} when creating the > state store / materializing the KTable. However there are a few cases where a > separate changelog topic would not be required as we can re-use an existing > topic for that. This ticket summarize a specific issue that can be optimized: > Consider the case when a KTable is materialized and then sent directly into a > sink topic with the same key, e.g. > {code} > table1 = stream.groupBy(...).aggregate("state1").to("topic2"); > {code} > Then we do not need to create a {{state1-changelog}} but can just use > {{topic2}} as its changelog. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5581) Avoid creating changelog topics for state stores that are materialized from a source topic
[ https://issues.apache.org/jira/browse/KAFKA-5581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeyhun Karimov reassigned KAFKA-5581: - Assignee: (was: Jeyhun Karimov) > Avoid creating changelog topics for state stores that are materialized from a > source topic > -- > > Key: KAFKA-5581 > URL: https://issues.apache.org/jira/browse/KAFKA-5581 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang > Labels: architecture, performance > > Today Streams make all state stores to be backed by a changelog topic by > default unless users overrides it by {{disableLogging}} when creating the > state store / materializing the KTable. However there are a few cases where a > separate changelog topic would not be required as we can re-use an existing > topic for that. A few examples: > There are a few places where the materialized store do not need a separate > changelog topic. This issue summarize a specific issue: > 1) If a KTable is read directly from a source topic, and is materialized i.e. > {code} > table1 = builder.table("topic1", "store1")`. > {code} > In this case {{table1}}'s changelog topic can just be {{topic1}}, and we do > not need to create a separate {{table1-changelog}} topic. > 2) if a KStream is materialized for joins where the streams are directly from > a topic, e.g.: > {code} > stream1 = builder.stream("topic1"); > stream2 = builder.stream("topic2"); > stream3 = stream1.join(stream2, windows); // stream1 and stream2 are > materialized with a changelog topic > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key
[ https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeyhun Karimov reassigned KAFKA-4304: - Assignee: (was: Jeyhun Karimov) > Extend Interactive Queries for return latest update timestamp per key > - > > Key: KAFKA-4304 > URL: https://issues.apache.org/jira/browse/KAFKA-4304 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: needs-kip > > Currently, when querying state store, it is not clear when the key was > updated last. The ides of this JIRA is to make the latest update timestamp > for each key-value-pair of the state store accessible. > For example, this might be useful to > * check if a value was update but did not changed (just compare the update > TS) > * if you want to consider only recently updated keys -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail
Damian Guy created KAFKA-6360: - Summary: RocksDB segments not removed when store is closed causes re-initialization to fail Key: KAFKA-6360 URL: https://issues.apache.org/jira/browse/KAFKA-6360 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Damian Guy Assignee: Damian Guy Priority: Blocker Fix For: 1.1.0 When a store is re-initialized it is first closed, before it is opened again. When this happens the segments in the {{Segments}} class are closed, but they are not removed from the list of segments. So when the store is re-initialized the old closed segments are used. This results in: {code} [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] task [1_3] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-24: (org.apache.kafka.streams.processor.internals.ProcessorStateManager) org.apache.kafka.streams.errors.InvalidStateStoreException: Store KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed at org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241) at org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102) at org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122) at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78) at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33) at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179) at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6359) Work for KIP-236
Tom Bentley created KAFKA-6359: -- Summary: Work for KIP-236 Key: KAFKA-6359 URL: https://issues.apache.org/jira/browse/KAFKA-6359 Project: Kafka Issue Type: Improvement Reporter: Tom Bentley Assignee: Tom Bentley Priority: Minor This issue is for the work described in KIP-236. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6342) Move workaround for JSON parsing of non-escaped strings
[ https://issues.apache.org/jira/browse/KAFKA-6342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289131#comment-16289131 ] ASF GitHub Bot commented on KAFKA-6342: --- GitHub user umesh9794 opened a pull request: https://github.com/apache/kafka/pull/4321 KAFKA-6342 : Move workaround for JSON parsing of non-escaped strings This PR moves the JSON parsing workaround of [this PR](https://github.com/apache/kafka/pull/4303) to new method and uses this method in `ZkClient` etc. classes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/umesh9794/kafka KAFKA-6342 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4321.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4321 commit ec8abab0629ecdea4a3a5970653e4c88025b8dfd Author: umesh chaudharyDate: 2017-12-13T10:51:23Z Initial Commit > Move workaround for JSON parsing of non-escaped strings > --- > > Key: KAFKA-6342 > URL: https://issues.apache.org/jira/browse/KAFKA-6342 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Rajini Sivaram >Assignee: Umesh Chaudhary > Fix For: 1.1.0 > > > KAFKA-6319 added a workaround to parse invalid JSON persisted using older > versions of Kafka because special characters were not escaped. The workaround > is required in 1.0.1 to enable parsing invalid JSON from ACL configs in > ZooKeeper. We can move the workaround out of kafka.utils.Json#parseFull for > 1.1.0 so that it is applied only to ACLs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6358) Per topic producer/fetch_consumer/fetch_follower metrics
Ricardo Bartolome created KAFKA-6358: Summary: Per topic producer/fetch_consumer/fetch_follower metrics Key: KAFKA-6358 URL: https://issues.apache.org/jira/browse/KAFKA-6358 Project: Kafka Issue Type: Wish Components: metrics Affects Versions: 1.0.0 Reporter: Ricardo Bartolome Priority: Minor We are using the following JMX beans to monitor Kafka 1.0.0: {code} kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce Mean 50thPercentile ... 99thPercentile kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer Count kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower Count {code} There are more, but this provide an idea of what we are using in order to get produce/fetch operations on a per-broker basis. Nevertheless, in order to identify abusing consumers/clients in our kafka cluster, we would appreciate to have these metrics in a per-topic basis. As example of per-topic metrics we have: {code} kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=.* {code} Where we have a per-topic bean with a "Count" attribute that we can query. That way we can know which topics are ingesting more data and which ones less data. We can't do that with the metric explained above. Would you consider a change in an upcoming Kafka version as a feature request? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6357) Return nonzero code in kafka-consumer-groups.sh tool in case of error
Rinat Shigapov created KAFKA-6357: - Summary: Return nonzero code in kafka-consumer-groups.sh tool in case of error Key: KAFKA-6357 URL: https://issues.apache.org/jira/browse/KAFKA-6357 Project: Kafka Issue Type: Improvement Components: tools Environment: kafka_2.12-0.11.0.0 Reporter: Rinat Shigapov Use case that triggered that issue: kafka-consumer-groups.sh can reset offset if there is no active consumer in the group. Otherwise it just prints error message about this situation and returns zero error code. Expected behaviour: nonzero code should be returned on error. Than proper scripting around kafka-consumer-groups.sh would be possible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6356) UnknownTopicOrPartitionException & NotLeaderForPartitionException and log deletion happening with retention bytes kept at -1.
kaushik srinivas created KAFKA-6356: --- Summary: UnknownTopicOrPartitionException & NotLeaderForPartitionException and log deletion happening with retention bytes kept at -1. Key: KAFKA-6356 URL: https://issues.apache.org/jira/browse/KAFKA-6356 Project: Kafka Issue Type: Bug Affects Versions: 0.10.1.0 Environment: Cent OS 7.2, HDD : 2Tb, CPUs: 56 cores, RAM : 256GB Reporter: kaushik srinivas Attachments: configs.txt, stderr_b0, stderr_b1, stderr_b2, stdout_b0, stdout_b1, stdout_b2, topic_description, topic_offsets Facing issues in kafka topic with partitions and replication factor of 3. Config used : No of partitions : 20 replication factor : 3 No of brokers : 3 Memory for broker : 32GB Heap for broker : 12GB Producer is run to produce data for 20 partitions of a single topic. But observed that partitions for which the leader is one of the broker(broker-1), the offsets are never incremented and also we see log file with 0MB size in the broker disk. Seeing below error in the brokers : error 1: 2017-12-13 07:11:11,191] ERROR [ReplicaFetcherThread-0-2], Error for partition [test2,5] to broker 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) error 2: [2017-12-11 12:19:41,599] ERROR [ReplicaFetcherThread-0-2], Error for partition [test1,13] to broker 2:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) Attaching, 1. error and std out files of all the brokers. 2. kafka config used. 3. offsets and topic description. Retention bytes was kept to -1 and retention period 96 hours. But still observing some of the log files deleting at the broker, from logs : [2017-12-11 12:20:20,586] INFO Deleting index /var/lib/mesos/slave/slaves/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-S7/frameworks/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-0006/executors/ckafka__5f085d0c-e296-40f0-a686-8953dd14e4c6/runs/506a1ce7-23d1-45ea-bb7c-84e015405285/kafka-broker-data/broker-1/test1-12/.timeindex (kafka.log.TimeIndex) [2017-12-11 12:20:20,587] INFO Deleted log for partition [test1,12] in /var/lib/mesos/slave/slaves/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-S7/frameworks/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-0006/executors/ckafka__5f085d0c-e296-40f0-a686-8953dd14e4c6/runs/506a1ce7-23d1-45ea-bb7c-84e015405285/kafka-broker-data/broker-1/test1-12. (kafka.log.LogManager) We are expecting the logs to be never delete if retention bytes set to -1. -- This message was sent by Atlassian JIRA (v6.4.14#64029)