[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down
[ https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361853#comment-16361853 ] Apurva Mehta commented on KAFKA-6446: - IMO, the right solution here would be for the producer to transition to a `FATAL_ERROR` state so that the only possible operation afterward is to `close()` the producer. In reality, if a `transactionalId` is specified, but the producer can't connect the cluster and initialize transactions, it should be able to do nothing else. Otherwise we risk violating transactional semantics. > KafkaProducer with transactionId endless waits when bootstrap server is down > > > Key: KAFKA-6446 > URL: https://issues.apache.org/jira/browse/KAFKA-6446 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Eduardo Sciullo >Priority: Critical > Attachments: Test.java > > > When bootstrap server is down, a KafkaProducer with transactionId endless > waits on initTransactions. > The timeouts don't apply to that operation: don't honor the > {{TRANSACTION_TIMEOUT_CONFIG.}} > Attached an example of my code to reproduce the scenario. > > I opened this issue as suggested by [Gary > Russell|https://stackoverflow.com/users/1240763/gary-russell] > [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6503) Connect: Plugin scan is very slow
[ https://issues.apache.org/jira/browse/KAFKA-6503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361812#comment-16361812 ] ASF GitHub Bot commented on KAFKA-6503: --- rayokota closed pull request #4561: KAFKA-6503: Parallelize plugin scanning URL: https://github.com/apache/kafka/pull/4561 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index 345d7ef011d..c57caac9cba 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -269,6 +269,7 @@ private PluginScanResult scanPluginPath( ConfigurationBuilder builder = new ConfigurationBuilder(); builder.setClassLoaders(new ClassLoader[]{loader}); builder.addUrls(urls); +builder.useParallelExecutor(); Reflections reflections = new Reflections(builder); return new PluginScanResult( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect: Plugin scan is very slow > - > > Key: KAFKA-6503 > URL: https://issues.apache.org/jira/browse/KAFKA-6503 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Per Steffensen >Priority: Critical > Fix For: 1.1.0 > > > Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is > very slow - see logs from starting my Kafka-Connect instance at the bottom. > It takes almost 4 minutes scanning. I am running Kafka-Connect in docker > based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to > /usr/share/java. The only thing I have added is a 13MB jar in > /usr/share/java/kafka-connect-file-streamer-client containing two connectors > and a converter. That one alone seems to take 20 secs. > If it was just scanning in the background, and everything was working it > probably would not be a big issue. But it does not. Right after starting the > Kafka-Connect instance I try to create a connector via the /connectors > endpoint, but it will not succeed before the plugin scanning has finished (4 > minutes) > I am not even sure why scanning is necessary. Is it not always true that > connectors, converters etc are mentioned by name, so to see if it exists, > just try to load the class - the classloader will tell if it is available. > Hmmm, there is probably a reason. > Anyway, either it should be made much faster, or at least Kafka-Connect > should be fully functional (or as functional as possible) while scanning is > going on. > {code} > [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a > moment ... (org.apache.kafka.connect.cli.ConnectDistributed) > [2018-01-30 13:52:27,218] INFO Loading plugin from: > /usr/share/java/kafka-connect-file-streamer-client > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,037] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/} > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,038] INFO Added plugin > 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,039] INFO Added plugin > 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,040] INFO Added plugin > 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,049] INFO Loading plugin from: > /usr/share/java/kafka-connect-elasticsearch > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:47,595] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/} >
[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down
[ https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361744#comment-16361744 ] huxihx commented on KAFKA-6446: --- [~apurva] Please take some time to see the comments above? Thanks. > KafkaProducer with transactionId endless waits when bootstrap server is down > > > Key: KAFKA-6446 > URL: https://issues.apache.org/jira/browse/KAFKA-6446 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Eduardo Sciullo >Priority: Critical > Attachments: Test.java > > > When bootstrap server is down, a KafkaProducer with transactionId endless > waits on initTransactions. > The timeouts don't apply to that operation: don't honor the > {{TRANSACTION_TIMEOUT_CONFIG.}} > Attached an example of my code to reproduce the scenario. > > I opened this issue as suggested by [Gary > Russell|https://stackoverflow.com/users/1240763/gary-russell] > [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361725#comment-16361725 ] Matthias J. Sax commented on KAFKA-6555: I think KAFKA-6145 is not relevant here. Feel free to extend the ticket description of KAFKA-6144 directly and close this as duplicate (not child ticket required). > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable giving making the partition down. We can > make the state store partition queryable for the data already present in the > state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6472) WordCount example code error
[ https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361671#comment-16361671 ] ASF GitHub Bot commented on KAFKA-6472: --- guozhangwang closed pull request #126: KAFKA-6472 - Fix WordCount example code error URL: https://github.com/apache/kafka-site/pull/126 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/10/streams/tutorial.html b/10/streams/tutorial.html index 0bc7314..11162e8 100644 --- a/10/streams/tutorial.html +++ b/10/streams/tutorial.html @@ -516,7 +516,7 @@ Writing a th -counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()); +counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > WordCount example code error > > > Key: KAFKA-6472 > URL: https://issues.apache.org/jira/browse/KAFKA-6472 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 0.11.0.2 >Reporter: JONYhao >Assignee: Joel Hamill >Priority: Trivial > Labels: newbie > Original Estimate: 168h > Remaining Estimate: 168h > > This is a "(" missed in the WordCount example tutorial > [https://kafka.apache.org/10/documentation/streams/tutorial] > at the end of the page ,line 31 > 31 {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), > Serdes.Long());}} > {{should be }} > {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long()));}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6472) WordCount example code error
[ https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361668#comment-16361668 ] ASF GitHub Bot commented on KAFKA-6472: --- guozhangwang closed pull request #4538: KAFKA-6472 - Fix WordCount example code error URL: https://github.com/apache/kafka/pull/4538 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/tutorial.html b/docs/streams/tutorial.html index 1c8c082ce97..f390b6df92b 100644 --- a/docs/streams/tutorial.html +++ b/docs/streams/tutorial.html @@ -516,7 +516,7 @@ Writing a th -counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()); +counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > WordCount example code error > > > Key: KAFKA-6472 > URL: https://issues.apache.org/jira/browse/KAFKA-6472 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 0.11.0.2 >Reporter: JONYhao >Assignee: Joel Hamill >Priority: Trivial > Labels: newbie > Original Estimate: 168h > Remaining Estimate: 168h > > This is a "(" missed in the WordCount example tutorial > [https://kafka.apache.org/10/documentation/streams/tutorial] > at the end of the page ,line 31 > 31 {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), > Serdes.Long());}} > {{should be }} > {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long()));}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361613#comment-16361613 ] Ashish Surana edited comment on KAFKA-6555 at 2/13/18 12:04 AM: Ok, shouldn't it be the child ticket for KAFKA-6144 because the related tickets KAFKA-6145 & KAFKA-6031 doesn't completely solve the pause time during rebalancing. KAFKA-6145 reduces the time of rebalancing state of the stream task to a great extent, but it doesn't completely remove it. KAFKA-6031 is to allow reads from standby replicas, but it also doesn't completely eliminate the need to access state during rebalancing. What if there is no replica, and primary goes down? What if one of the replica doesn't get promoted to active? What if all the replicas of the partition are in rebalancing state at the same time? KAFKA-6144 captures the idea of this ticket i.e. to allow access to state store during rebalancing but KAKFA-6145 & KAFKA-6031 are not sufficient to achieve that. was (Author: asurana): Ok, shouldn't it be the child ticket for KAFKA-6144 because the related tickets KAFKA-6145 & KAFKA-6031 doesn't completely solve the pause time during rebalancing. KAFKA-6145 reduces the time of rebalancing state of the stream task to a great extent, but it doesn't completely remove it. KAFKA-6031 is to allow reads from standby replicas, but it also doesn't completely eliminate the need to access state during rebalancing. What if there is no replica, and primary goes down? What if one of the replica doesn't get promoted to active? What if all the replicas of the partition are in rebalancing state at the same time? KAFKA-6144 captures the idea of this ticket i.e. to allow access to state store during rebalancing but KAKFA-6145 & KAFKA-6031 are not sufficient to achieve that. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable giving making the partition down. We can > make the state store partition queryable for the data already present in the > state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361589#comment-16361589 ] Matthias J. Sax commented on KAFKA-6555: You mean, you are preparing an PR? Great! I would still like to merge both tickets. Can update the other ticket (or comment on it) and close this ticket as duplicate? > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable giving making the partition down. We can > make the state store partition queryable for the data already present in the > state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5944) Add unit tests for handling of authentication failures in clients
[ https://issues.apache.org/jira/browse/KAFKA-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361546#comment-16361546 ] ASF GitHub Bot commented on KAFKA-5944: --- hachikuji closed pull request #3965: KAFKA-5944: Unit tests for handling SASL authentication failures in clients URL: https://github.com/apache/kafka/pull/3965 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index d843414fd7a..65255fe9648 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.utils.Time; @@ -77,6 +78,7 @@ public FutureResponse(Node node, private Node node = null; private final Set ready = new HashSet<>(); private final MapblackedOut = new HashMap<>(); +private final Map authenticationException = new HashMap<>(); // Use concurrent queue for requests so that requests may be queried from a different thread private final Queue requests = new ConcurrentLinkedDeque<>(); // Use concurrent queue for responses so that responses may be updated during poll() from a different thread. @@ -102,7 +104,7 @@ public boolean isReady(Node node, long now) { @Override public boolean ready(Node node, long now) { -if (isBlackedOut(node)) +if (isBlackedOut(node) || authenticationException(node) != null) return false; ready.add(node.idString()); return true; @@ -117,6 +119,12 @@ public void blackout(Node node, long duration) { blackedOut.put(node, time.milliseconds() + duration); } +public void authenticationFailed(Node node, long duration) { +authenticationException.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception()); +disconnect(node.idString()); +blackout(node, duration); +} + private boolean isBlackedOut(Node node) { if (blackedOut.containsKey(node)) { long expiration = blackedOut.get(node); @@ -137,7 +145,7 @@ public boolean connectionFailed(Node node) { @Override public AuthenticationException authenticationException(Node node) { -return null; +return authenticationException.get(node); } @Override @@ -347,6 +355,7 @@ public void reset() { responses.clear(); futureResponses.clear(); metadataUpdates.clear(); +authenticationException.clear(); } public void prepareMetadataUpdate(Cluster cluster, Set unavailableTopics) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 186ccf06cb5..f08a99b6ddc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NotLeaderForPartitionException; @@ -248,6 +249,75 @@ public void testInvalidTopicNames() throws Exception { } } +@Test +public void testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws Exception { +AdminClientUnitTestEnv env = mockClientEnv(); +Node node = env.cluster().controller(); +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().setNode(node); +env.kafkaClient().authenticationFailed(node, 300); + +callAdminClientApisAndExpectAnAuthenticationError(env); + +// wait less than the blackout period, the connection should fail and the authentication error should remain +env.time().sleep(30); +assertTrue(env.kafkaClient().connectionFailed(node)); +
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361515#comment-16361515 ] Ashish Surana commented on KAFKA-6555: -- Hi Matthias, Thanks for pointing to KAFKA-6144. As I go through it, it looks very similar ticket but with minor difference: * I am suggesting to allow stale reads only from PARTITION_ASSIGNED (not from PARTITION_REVOKED) primarily as it's going to be the one in RUNNING state, and this is the minimum we need to do keep serving request for this partition. We still have one instance doing write or read and still want to have pure standby replicas. This approach is good if we continue with current design of one active write/read instance. I have made the changes, and can share it in few days. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable giving making the partition down. We can > make the state store partition queryable for the data already present in the > state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish Surana updated KAFKA-6555: - Description: State store in Kafka streams are currently only queryable when StreamTask is in RUNNING state. The idea is to make it queryable even in the RESTORATION (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and making the data inaccessible during this time could be downtime not suitable for many applications. When the active partition goes down then one of the following occurs: # One of the standby replica partition gets promoted to active: Replica task has to restore the remaining state from the changelog topic before it can become RUNNING. The time taken for this depends on how much the replica is lagging behind. During this restoration time the state store for that partition is currently not queryable giving making the partition down. We can make the state store partition queryable for the data already present in the state store. # When there is no replica or standby task, then active task will be started in one of the existing node. That node has to build the entire state from the changelog topic which can take lot of time depending on how big is the changelog topic, and keeping state store not queryable during this time is the downtime for the parition. It's very important improvement as it could simply improve the availability of microservices developed using kafka streams. I am working on a patch for this change. Any feedback or comments are welcome. was: State store in Kafka streams are currently only queryable when StreamTask is in RUNNING state. The idea is to make it queryable even in the RESTORATION (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and making the data inaccessible during this time could be downtime not suitable for many applications. When the active partition goes down then one of the following occurs: # One of the standby replica partition gets promoted to active: Replica task has to restore the remaining state from the changelog topic before it can become RUNNING. The time taken for this depends on how much the replica is lagging behind. During this restoration time the state store for that partition is currently not queryable giving making the partition down. We can make the state store partition queryable for the data already present in the state store. # When there is no replica or standby task, then active task will be started in one of the existing node. That node has to build the entire state from the changelog topic which can take lot of time depending on how big is the changelog topic, and keeping state store not queryable during this time is the downtime for the parition. It's very important improvement as it could simply improve the availability of microservices developed using kafka streams. Any feedback or comments are welcome. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable giving making the partition down. We can > make the state store partition queryable for the data already present in the > state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361495#comment-16361495 ] Matthias J. Sax commented on KAFKA-6535: Ok. Sounds good. There are couple of other scenarios, too, but I guess we consider all of them as missing features or bugs and thus plan to resolve them in the future. Thanks for clarifying. > Set default retention ms for Streams repartition topics to infinity > --- > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361492#comment-16361492 ] Matthias J. Sax commented on KAFKA-6555: [~asurana] Thanks for create this JIRA: how is this different to KAFKA-6144 ? > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable giving making the partition down. We can > make the state store partition queryable for the data already present in the > state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. Any feedback or comments are > welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361490#comment-16361490 ] Guozhang Wang commented on KAFKA-6535: -- I think it may be brining more operational risk than benefit in practice: knowing that some topics is not used by any other teams for sure is hard, so opening this door to allow users to this, mistakenly, may lead to more issues. Instead, we should just encourage users to use internal topics when possible unless there are reasons not to, so far we have the following reasons that I think we can fix in the near future, so it would be still a good approach for encouraging internal topics moving forward: 1. Users want to override {{num.partitions}} and other configs for separate topics; this is being discussed in some KIPs already. 2. Users want to maintain the topic names for extensibility (upgrading the app without restarting a new app, for example); this is also being discussed to be better supported in the future. > Set default retention ms for Streams repartition topics to infinity > --- > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5846) Use singleton NoOpConsumerRebalanceListener in subscribe() call where listener is not specified
[ https://issues.apache.org/jira/browse/KAFKA-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191405#comment-16191405 ] Ted Yu edited comment on KAFKA-5846 at 2/12/18 8:49 PM: Patch looks good to me . was (Author: yuzhih...@gmail.com): Patch looks good to me. > Use singleton NoOpConsumerRebalanceListener in subscribe() call where > listener is not specified > --- > > Key: KAFKA-5846 > URL: https://issues.apache.org/jira/browse/KAFKA-5846 > Project: Kafka > Issue Type: Task >Reporter: Ted Yu >Assignee: Kamal Chandraprakash >Priority: Minor > > Currently KafkaConsumer creates instance of NoOpConsumerRebalanceListener for > each subscribe() call where ConsumerRebalanceListener is not specified: > {code} > public void subscribe(Pattern pattern) { > subscribe(pattern, new NoOpConsumerRebalanceListener()); > {code} > We can create a singleton NoOpConsumerRebalanceListener to be used in such > scenarios. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish Surana updated KAFKA-6555: - Description: State store in Kafka streams are currently only queryable when StreamTask is in RUNNING state. The idea is to make it queryable even in the RESTORATION (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and making the data inaccessible during this time could be downtime not suitable for many applications. When the active partition goes down then one of the following occurs: # One of the standby replica partition gets promoted to active: Replica task has to restore the remaining state from the changelog topic before it can become RUNNING. The time taken for this depends on how much the replica is lagging behind. During this restoration time the state store for that partition is currently not queryable giving making the partition down. We can make the state store partition queryable for the data already present in the state store. # When there is no replica or standby task, then active task will be started in one of the existing node. That node has to build the entire state from the changelog topic which can take lot of time depending on how big is the changelog topic, and keeping state store not queryable during this time is the downtime for the parition. It's very important improvement as it could simply improve the availability of microservices developed using kafka streams. Any feedback or comments are welcome. was: State store in Kafka streams are currently only queryable when StreamTask is in RUNNING state. The idea is to make it queryable even in the RESTORATION (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and making the data inaccessible during this time could be downtime not suitable for many applications. When the active partition goes down then one of the following occurs: # One of the standby replica partition gets promoted to active: Replica task has to restore the remaining state from the changelog topic before it can become RUNNING. The time taken for this depends on how much the replica is lagging behind. During this restoration time the state store for that partition is currently not queryable giving making the partition down. We can make the state store partition queryable for the data already present in the state store. # When there is no replica or standby task, then active task will be started in one of the existing node. That node has to build the entire state from the changelog topic which can take lot of time depending on how big is the changelog topic, and keeping state store not queryable during this time is the downtime for the parition. It's very important improvement as it could simply improve the availability of microservices developed using kafka streams. Any feedback or comments are welcome. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable giving making the partition down. We can > make the state store partition queryable for the data already present in the > state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. Any feedback or comments are > welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish Surana updated KAFKA-6555: - Description: State store in Kafka streams are currently only queryable when StreamTask is in RUNNING state. The idea is to make it queryable even in the RESTORATION (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and making the data inaccessible during this time could be downtime not suitable for many applications. When the active partition goes down then one of the following occurs: # One of the standby replica partition gets promoted to active: Replica task has to restore the remaining state from the changelog topic before it can become RUNNING. The time taken for this depends on how much the replica is lagging behind. During this restoration time the state store for that partition is currently not queryable giving making the partition down. We can make the state store partition queryable for the data already present in the state store. # When there is no replica or standby task, then active task will be started in one of the existing node. That node has to build the entire state from the changelog topic which can take lot of time depending on how big is the changelog topic, and keeping state store not queryable during this time is the downtime for the parition. It's very important improvement as it could simply improve the availability of microservices developed using kafka streams. Any feedback or comments are welcome. was: State store in Kafka streams are currently only queryable when StreamTask is in RUNNING state. The idea is to make it queryable even in the RESTORATION (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and making the data inaccessible during this time could be downtime not suitable for many applications. When the active partition goes down then one of the following occurs: # One of the standby replica partition gets promoted to active: Replica restores the remaining state from the changelog topic. The time taken for this depends on how much the replica is lagging behind in consuming changelog topic. During this restoration time the state store for that partition is not queryable giving us the downtime for the entire parition. We can make it queryable for the data already present in the state store. # When there is no replica or standby task, then active task will be started in one of the existing node. That node has to build the entire state from the changelog topic which can take lot of time depending on how big is the changelog topic, and keeping state store not queryable during this time is the downtime for the parition. It's very important improvement as it could simply improve the availability of microservices developed using kafka streams. Any feedback or comments are welcome. > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable giving making the partition down. We can > make the state store partition queryable for the data already present in the > state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. Any feedback or comments are > welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6555) Making state store queryable during restoration
Ashish Surana created KAFKA-6555: Summary: Making state store queryable during restoration Key: KAFKA-6555 URL: https://issues.apache.org/jira/browse/KAFKA-6555 Project: Kafka Issue Type: Improvement Components: streams Reporter: Ashish Surana State store in Kafka streams are currently only queryable when StreamTask is in RUNNING state. The idea is to make it queryable even in the RESTORATION (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and making the data inaccessible during this time could be downtime not suitable for many applications. When the active partition goes down then one of the following occurs: # One of the standby replica partition gets promoted to active: Replica restores the remaining state from the changelog topic. The time taken for this depends on how much the replica is lagging behind in consuming changelog topic. During this restoration time the state store for that partition is not queryable giving us the downtime for the entire parition. We can make it queryable for the data already present in the state store. # When there is no replica or standby task, then active task will be started in one of the existing node. That node has to build the entire state from the changelog topic which can take lot of time depending on how big is the changelog topic, and keeping state store not queryable during this time is the downtime for the parition. It's very important improvement as it could simply improve the availability of microservices developed using kafka streams. Any feedback or comments are welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish Surana updated KAFKA-6555: - Flags: Important > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica > restores the remaining state from the changelog topic. The time taken for > this depends on how much the replica is lagging behind in consuming changelog > topic. During this restoration time the state store for that partition is not > queryable giving us the downtime for the entire parition. We can make it > queryable for the data already present in the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. Any feedback or comments are > welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6554) Broker doesn't reject Produce request with inconsistent state
Simon Fell created KAFKA-6554: - Summary: Broker doesn't reject Produce request with inconsistent state Key: KAFKA-6554 URL: https://issues.apache.org/jira/browse/KAFKA-6554 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 1.0.0 Reporter: Simon Fell Attachments: produce_v3.txt Produce messages of type v3 have offset deltas in each record along with a LastOffsetDelta for the topic/partition set. In investigating an issue with missing offsets, I found a bug in a producer library where it would send multiple records, but leave LastOffsetDelta at 0. This causes various problems including holes in the offsets fetched by the consumer. As lastOffsetDelta can be computed by looking at the records, it seems like the broker should at least validate the LastOffsetDelta field against the contained records to stop this bad data getting in. I've attached a decode v3 produce message that was causing the problems, and was accepted by the broker. Here's a link to the issue in the kafka library we were using which has more context if you need it. https://github.com/Shopify/sarama/issues/1032 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6552) “entity_type” not exactly in description of kafka-configs.sh
[ https://issues.apache.org/jira/browse/KAFKA-6552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6552. -- Resolution: Fixed Assignee: Xin Fix Version/s: 1.2.0 > “entity_type” not exactly in description of kafka-configs.sh > - > > Key: KAFKA-6552 > URL: https://issues.apache.org/jira/browse/KAFKA-6552 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.11.0.2 >Reporter: Xin >Assignee: Xin >Priority: Trivial > Fix For: 1.2.0 > > > There are some “entity_type” in the description of the command option > “--add-config”, but the “--entity-type ” is the command option of > kafka-configs.sh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6552) “entity_type” not exactly in description of kafka-configs.sh
[ https://issues.apache.org/jira/browse/KAFKA-6552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361153#comment-16361153 ] ASF GitHub Bot commented on KAFKA-6552: --- guozhangwang closed pull request #4556: KAFKA-6552:“entity_type” not exactly in description of kafka-configs.sh URL: https://github.com/apache/kafka/pull/4556 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index b18dcc99937..dfaaa51139c 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -296,10 +296,10 @@ object ConfigCommand extends Config { val nl = System.getProperty("line.separator") val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " + -"For entity_type '" + ConfigType.Topic + "': " + LogConfig.configNames.map("\t" + _).mkString(nl, nl, nl) + -"For entity_type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.asScala.map("\t" + _).mkString(nl, nl, nl) + -"For entity_type '" + ConfigType.User + "': " + DynamicConfig.User.names.asScala.map("\t" + _).mkString(nl, nl, nl) + -"For entity_type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.asScala.map("\t" + _).mkString(nl, nl, nl) + +"For entity-type '" + ConfigType.Topic + "': " + LogConfig.configNames.map("\t" + _).mkString(nl, nl, nl) + +"For entity-type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.asScala.map("\t" + _).mkString(nl, nl, nl) + +"For entity-type '" + ConfigType.User + "': " + DynamicConfig.User.names.asScala.map("\t" + _).mkString(nl, nl, nl) + +"For entity-type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.asScala.map("\t" + _).mkString(nl, nl, nl) + s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified together to update config for clients of a specific user.") .withRequiredArg .ofType(classOf[String]) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > “entity_type” not exactly in description of kafka-configs.sh > - > > Key: KAFKA-6552 > URL: https://issues.apache.org/jira/browse/KAFKA-6552 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.11.0.2 >Reporter: Xin >Priority: Trivial > > There are some “entity_type” in the description of the command option > “--add-config”, but the “--entity-type ” is the command option of > kafka-configs.sh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6367) Fix StateRestoreListener To Use Correct Batch Ending Offset
[ https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6367: - Fix Version/s: (was: 1.0.2) 1.0.1 > Fix StateRestoreListener To Use Correct Batch Ending Offset > --- > > Key: KAFKA-6367 > URL: https://issues.apache.org/jira/browse/KAFKA-6367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.1.0, 1.0.1 > > > {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} > long for the batch ending offset, but the {{nextPosition}} is not correct, it > should be the offset of the latest restored offset, but {{nextPosition}} is > the offset of the first not restored offset. > We can't automatically use {{nextPosition}} - 1 as this could be a commit > marker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6504) Connect: Some per-task-metrics not working
[ https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6504: - Fix Version/s: 1.0.1 > Connect: Some per-task-metrics not working > -- > > Key: KAFKA-6504 > URL: https://issues.apache.org/jira/browse/KAFKA-6504 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Per Steffensen >Assignee: Robert Yokota >Priority: Minor > Fix For: 1.1.0, 1.0.1 > > > Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at > least it seems like MBean > "kafka.connect:type=source-task-metrics,connector=,task=x" > attribute "source-record-active-count" reports the same number for all x > tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a > source-connector "my-connector" with 2 tasks that both run in the same > Kafka-Connect instance, but I know that only one of them actually produces > anything (and therefore can have "active source-records") both > "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and > "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes > up (following each other). It should only go up for the one task that > actually produces something. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5550) Struct.put() should include the field name if validation fails
[ https://issues.apache.org/jira/browse/KAFKA-5550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361101#comment-16361101 ] ASF GitHub Bot commented on KAFKA-5550: --- hachikuji closed pull request #3507: KAFKA-5550 URL: https://github.com/apache/kafka/pull/3507 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java index 6e7b5d23bbd..5e4de21b632 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -211,7 +211,9 @@ public Struct put(String fieldName, Object value) { * @return the Struct, to allow chaining of {@link #put(String, Object)} calls */ public Struct put(Field field, Object value) { -ConnectSchema.validateValue(field.schema(), value); +if (null == field) +throw new DataException("field cannot be null."); +ConnectSchema.validateValue(field.name(), field.schema(), value); values[field.index()] = value; return this; } diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index 42345b1986c..e91960576a1 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -266,4 +266,28 @@ public void testValidateFieldWithInvalidValueType() { thrown.expectMessage("Invalid Java object for schema type INT8: class java.lang.Object for field: \"field\""); ConnectSchema.validateValue(fieldName, Schema.INT8_SCHEMA, new Object()); } + +@Test +public void testPutNullField() { +final String fieldName = "fieldName"; +Schema testSchema = SchemaBuilder.struct() +.field(fieldName, Schema.STRING_SCHEMA); +Struct struct = new Struct(testSchema); + +thrown.expect(DataException.class); +Field field = null; +struct.put(field, "valid"); +} + +@Test +public void testInvalidPutIncludesFieldName() { +final String fieldName = "fieldName"; +Schema testSchema = SchemaBuilder.struct() +.field(fieldName, Schema.STRING_SCHEMA); +Struct struct = new Struct(testSchema); + +thrown.expect(DataException.class); +thrown.expectMessage("Invalid value: null used for required field: \"fieldName\", schema type: STRING"); +struct.put(fieldName, null); +} } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Struct.put() should include the field name if validation fails > -- > > Key: KAFKA-5550 > URL: https://issues.apache.org/jira/browse/KAFKA-5550 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Jeremy Custenborder >Assignee: Jeremy Custenborder >Priority: Minor > > When calling struct.put() with an invalid value, the error message should > include the field name. > {code:java} > @Test > public void testPutIncludesFieldName() { > final String fieldName = "fieldName"; > Schema testSchema = SchemaBuilder.struct() > .field(fieldName, Schema.STRING_SCHEMA); > Struct struct = new Struct(testSchema); > try { > struct.put(fieldName, null); > } catch (DataException ex) { > assertEquals( > "Invalid value: null used for required field: \"fieldName\", > schema type: STRING", > ex.getMessage() > ); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6553) Consumer consumed committed messages
Orel Shai created KAFKA-6553: Summary: Consumer consumed committed messages Key: KAFKA-6553 URL: https://issues.apache.org/jira/browse/KAFKA-6553 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.10.2.0 Reporter: Orel Shai Hi, We're using consumer kafka client 0.10.2.0 (that is working against Kafka broker 0.10.0) with the following configuration: {code:java} props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName()); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3"); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "4"); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); {code} So as you can see we're using autocommit. The consumer API version that we're using has a dedicated thread for doing autocommit ,so every one second we have an autocommit which means that we have an heartbeat every one second. For some reason we're getting the same message lots of times. While looking at our logs I can see the following: {code:java} 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-15 to the committed offset 352878 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-3 to the committed offset 352458 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-19 to the committed offset 353775 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 to the committed offset 352171 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-7 to the committed offset 352995 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-27 to the committed offset 352531 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 to the committed offset 351893 2018-02-11 10:56:24,656 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 at offset 352171 since the current position is 352205 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 at offset 351893 since the current position is 351929 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-26 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-17 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-29 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-5 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-8 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-20 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-2 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-14 since it is no longer fetchable {code} Consumer connection log: {code:java} 2018-02-12 08:18:13,506 DEBUG [DefaultThreadPool-9] Starting the Kafka consumer 2018-02-12 08:18:13,507 INFO [DefaultThreadPool-9] ConsumerConfig values: auto.commit.interval.ms = 1000 auto.offset.reset = latest bootstrap.servers = [list of servers] check.crcs = true client.id = 2cd03a2b-f040-4f7f-b20c-ce3fe5efbe00 connections.max.idle.ms = 54 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800
[jira] [Commented] (KAFKA-6476) Document dynamic config update
[ https://issues.apache.org/jira/browse/KAFKA-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360908#comment-16360908 ] ASF GitHub Bot commented on KAFKA-6476: --- rajinisivaram opened a new pull request #4558: KAFKA-6476: Documentation for dynamic broker configuration URL: https://github.com/apache/kafka/pull/4558 Docs for dynamic broker configuration (KIP-226). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document dynamic config update > -- > > Key: KAFKA-6476 > URL: https://issues.apache.org/jira/browse/KAFKA-6476 > Project: Kafka > Issue Type: Sub-task > Components: core, documentation >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Add documentation for dynamic broker config update. > Include: > - Command line options for kafka-configs.sh with examples > - Configs that can be updated along with constraints applied > - Secret rotation for password encoder > Also add a new column for broker configs to indicate which configs can be > dynamically updated, -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6549) Deadlock while processing Controller Events
[ https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360879#comment-16360879 ] Manikumar commented on KAFKA-6549: -- Below are the sequence of steps which led to deadlock situation. This is rare scenario. Will try to come up with the solution. # Both controller ZK node deletion and ZK Session expiration events happens in a quick succession. # For controller ZK node deletion, ControllerChangeHandler.handleDeletion() is called. Controller.Reelect event is added to queue and Reelect process is initiated. # For ZK Session expiration, ZooKeeperClientWatcher takes WriteLock and calls Controller StateChangeHandler.beforeInitializingSession(). beforeInitializingSession method adds expire event to controller queue and waits for expire event completion. # Controller.Reelect waits for ZooKeeperClient.ReadLock # ZooKeeperClientWatcher waits for Expire event completion, which intern waits inside controller queue. > Deadlock while processing Controller Events > --- > > Key: KAFKA-6549 > URL: https://issues.apache.org/jira/browse/KAFKA-6549 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Manikumar >Assignee: Manikumar >Priority: Blocker > Fix For: 1.1.0 > > Attachments: td.txt > > > Stack traces from a single node test cluster that was deadlocked while > processing controller Reelect and Expire events. Attached stack-trace. > {quote} > "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 > nid=0x7d03 waiting on condition [0x7278b000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bccadf30> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363) > at > org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531) > at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) > Locked ownable synchronizers: > - <0x000780054860> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > > "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 > nid=0xad03 waiting on condition [0x73fd3000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bcc584a0> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148) > at >