[jira] [Updated] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration
[ https://issues.apache.org/jira/browse/KAFKA-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Kashavkin updated KAFKA-12228: - Description: I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting up the broker with DigiCert SSL certificate. I used new options and I did everything like in example in [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key]. But I get the error: {code:bash} [2021-01-20 17:54:55,787] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) org.apache.kafka.common.config.ConfigException: Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) at kafka.network.Processor.(SocketServer.scala:790) at kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288) at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at scala.collection.AbstractIterable.foreach(Iterable.scala:920) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251) at kafka.network.SocketServer.startup(SocketServer.scala:125) at kafka.server.KafkaServer.startup(KafkaServer.scala:303) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) {code} Java is used: {code:bash} openjdk version "1.8.0_272" OpenJDK Runtime Environment (build 1.8.0_272-b10) OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode) {code} OS is Centos 7.8.2003 _openssl x509 -in certificate.pem -text :_ {code:java} Certificate: ... Signature Algorithm: ecdsa-with-SHA384 ... Subject Public Key Info: Public Key Algorithm: id-ecPublicKey Public-Key: (256 bit) {code} Log is attached. was: I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting up the broker with DigiCert SSL certificate. I used new options and I did everything like in example in [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key]. But I get the error: {code:bash} [2021-01-20 17:54:55,787] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) org.apache.kafka.common.config.ConfigException: Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) at kafka.network.Processor.(SocketServer.scala:790) at kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288) at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at scala.collection.AbstractIterable.foreach(Iterable.scala:920) at
[jira] [Updated] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration
[ https://issues.apache.org/jira/browse/KAFKA-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Kashavkin updated KAFKA-12228: - Attachment: (was: kafka.log) > Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common > for configuration > --- > > Key: KAFKA-12228 > URL: https://issues.apache.org/jira/browse/KAFKA-12228 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.7.0 >Reporter: Alexey Kashavkin >Priority: Major > Attachments: kafka.log > > > I found that Kafka 2.7.0 supports PEM certificates and I decided to try > setting up the broker with DigiCert SSL certificate. I used new options and I > did everything like in example in > [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key]. > But I get the error: > {code:bash} > [2021-01-20 17:54:55,787] ERROR [KafkaServer id=0] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.kafka.common.config.ConfigException: Invalid value > javax.net.ssl.SSLHandshakeException: no cipher suites in common for > configuration A client SSLEngine created with the provided settings can't > connect to a server SSLEngine created with those settings. > at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) > at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) > at > org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) > at kafka.network.Processor.(SocketServer.scala:790) > at kafka.network.SocketServer.newProcessor(SocketServer.scala:415) > at > kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288) > at > kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) > at scala.collection.AbstractIterable.foreach(Iterable.scala:920) > at > kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251) > at kafka.network.SocketServer.startup(SocketServer.scala:125) > at kafka.server.KafkaServer.startup(KafkaServer.scala:303) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > {code} > Java is used: > {code:bash} > openjdk version "1.8.0_272" > OpenJDK Runtime Environment (build 1.8.0_272-b10) > OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode) > {code} > OS is Centos 7.8.2003 > _openssl x509 -in certificate.pem -text :_ > {code:java} > Certificate: > ... > Signature Algorithm: ecdsa-with-SHA384 > ... > Subject Public Key Info: > Public Key Algorithm: id-ecPublicKey > Public-Key: (256 bit) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration
[ https://issues.apache.org/jira/browse/KAFKA-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Kashavkin updated KAFKA-12228: - Attachment: kafka.log > Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common > for configuration > --- > > Key: KAFKA-12228 > URL: https://issues.apache.org/jira/browse/KAFKA-12228 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.7.0 >Reporter: Alexey Kashavkin >Priority: Major > Attachments: kafka.log > > > I found that Kafka 2.7.0 supports PEM certificates and I decided to try > setting up the broker with DigiCert SSL certificate. I used new options and I > did everything like in example in > [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key]. > But I get the error: > {code:bash} > [2021-01-20 17:54:55,787] ERROR [KafkaServer id=0] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.kafka.common.config.ConfigException: Invalid value > javax.net.ssl.SSLHandshakeException: no cipher suites in common for > configuration A client SSLEngine created with the provided settings can't > connect to a server SSLEngine created with those settings. > at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) > at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) > at > org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) > at kafka.network.Processor.(SocketServer.scala:790) > at kafka.network.SocketServer.newProcessor(SocketServer.scala:415) > at > kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288) > at > kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) > at scala.collection.AbstractIterable.foreach(Iterable.scala:920) > at > kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251) > at kafka.network.SocketServer.startup(SocketServer.scala:125) > at kafka.server.KafkaServer.startup(KafkaServer.scala:303) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > {code} > Java is used: > {code:bash} > openjdk version "1.8.0_272" > OpenJDK Runtime Environment (build 1.8.0_272-b10) > OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode) > {code} > OS is Centos 7.8.2003 > _openssl x509 -in certificate.pem -text :_ > {code:java} > Certificate: > ... > Signature Algorithm: ecdsa-with-SHA384 > ... > Subject Public Key Info: > Public Key Algorithm: id-ecPublicKey > Public-Key: (256 bit) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.
[ https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269087#comment-17269087 ] kaushik srinivas commented on KAFKA-12164: -- Adding few major concerns with regard to the feedback of re creating the corrupt directories upon restart, function syncWithHive() (DataWriter.java) is called at every restart/boot up of the connector. And this is the function which does an initial audit of all the partition directories and tries to sync the hdfs folders with the hive partitions before proceeding further to consume records from kafka. Below is the snippet for the same. {code:java} List partitions = hiveMetaStore.listPartitions(hiveDatabase, topicTableMap.get(topic), (short) -1); FileStatus[] statuses = FileUtils.getDirectories(storage, new Path(topicDir)); for (FileStatus status : statuses) { String location = status.getPath().toString(); if (!partitions.contains(location)) { String partitionValue = getPartitionValue(location); hiveMetaStore.addPartition(hiveDatabase, topicTableMap.get(topic), partitionValue); } {code} Now going one step inside into function getDirectories > getDirectoriesImpl (from FileUtils). here, those paths are returned as partition path if a. the path is a directory b. path does not contain nested directories (by way of checking no of non directory files is equal to no of (directory + non directory) files in the path. If above conditions are met, then the path is added as partition path. So in the erroneous case where the actual path is supposed to look like /test1=0/test2=0/xxx.parquet But instead due to a crash looks like below, /test1=0/ In this case /test1=0 , satisfies the above a conditions and hence is returned as a new partition path to be updated to hive. Doing this update to hive fails because the actual partition for hive is expected to be /test1=0/test2=0 and not /test1=0/ So this would mean, once there is a corrupt partition directory in hdfs, at every restart of the connector syncWithHive() call will keep throwing hive exceptions till the directory is corrected in the hdfs. This means that the stage of consuming the old (failed to commit) records again (even assuming its present in kafka after restart) would never be reached and connector remains in crashed state forever and requires a manual intervention of clean up activity. -kaushik > ssue when kafka connect worker pod restart, during creation of nested > partition directories in hdfs file system. > > > Key: KAFKA-12164 > URL: https://issues.apache.org/jira/browse/KAFKA-12164 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: kaushik srinivas >Priority: Critical > > In our production labs, an issue is observed. Below is the sequence of the > same. > # hdfs connector is added to the connect worker. > # hdfs connector is creating folders in hdfs /test1=1/test2=2/ > Based on the custom partitioner. Here test1 and test2 are two separate nested > directories derived from multiple fields in the record using a custom > partitioner. > # Now kafka connect hdfs connector uses below function calls to create the > directories in the hdfs file system. > fs.mkdirs(new Path(filename)); > ref: > [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java] > Now the important thing to note is that if mkdirs() is a non atomic operation > (i.e can result in partial execution if interrupted) > then suppose the first directory ie test1 is created and just before creation > of test2 in hdfs happens if there is a restart to the connect worker pod. > Then the hdfs file system will remain with partial folders created for > partitions during the restart time frames. > So we might have conditions in hdfs as below > /test1=0/test2=0/ > /test1=1/ > /test1=2/test2=2 > /test1=3/test2=3 > So the second partition has a missing directory in it. And if hive > integration is enabled, hive metastore exceptions will occur since there is a > partition expected from hive table is missing for few partitions in hdfs. > *This can occur to any connector with some ongoing non atomic operation and a > restart is triggered to kafka connect worker pod. This will result in some > partially completed states in the system and may cause issues for the > connector to continue its operation*. > *This is a very critical issue and needs some attention on ways for handling > the same.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.
[ https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269064#comment-17269064 ] kaushik srinivas edited comment on KAFKA-12164 at 1/21/21, 6:31 AM: For the below comments "Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand that a nested directory can be constructed from a given path, even if some of the parents exist. Which makes me think that a restart would allow the creation of the full path as long as the partitioner is deterministic with respect to the record (i.e. for a given record it always creates the same path). Upon a restart the export of a record will be retried, so the partitioner could possibly reattempt the full creation of the path. But that's an early assumption from my side. " Lets suppose partitioner derives the directory /test1/test2 for a given record. And the connector crashes during the directory creation and hdfs ends up with only /test1/ folder. But there is no guarantee that the kafka record which was not committed fully by the connector would exist in kafka even after restart/recovery of the connector. So there could be scenarios where these partial directories remain partial forever and requires an additional manual intervention to clean it up or complete its creation in hdfs, without which we see issues in hive table partitions if enabled. was (Author: kaushik srinivas): For the below comments "Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand that a nested directory can be constructed from a given path, even if some of the parents exist. Which makes me think that a restart would allow the creation of the full path as long as the partitioner is deterministic with respect to the record (i.e. for a given record it always creates the same path). Upon a restart the export of a record will be retried, so the partitioner could possibly reattempt the full creation of the path. But that's an early assumption from my side. " Lets suppose partitioner derives the directory /test1/test2 for a given record. And the connector crashed during the directory creation and hdfs ends up with only /test1/ folder. But there is no guarantee that the kafka record which was not committed fully by the connector would exist in kafka even after restart/recovery of the connector. So there could be scenarios where these partial directories remain partial forever and requires an additional manual intervention to clean it up or complete its creation in hdfs, without which we see issues in hive table partitions if enabled. > ssue when kafka connect worker pod restart, during creation of nested > partition directories in hdfs file system. > > > Key: KAFKA-12164 > URL: https://issues.apache.org/jira/browse/KAFKA-12164 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: kaushik srinivas >Priority: Critical > > In our production labs, an issue is observed. Below is the sequence of the > same. > # hdfs connector is added to the connect worker. > # hdfs connector is creating folders in hdfs /test1=1/test2=2/ > Based on the custom partitioner. Here test1 and test2 are two separate nested > directories derived from multiple fields in the record using a custom > partitioner. > # Now kafka connect hdfs connector uses below function calls to create the > directories in the hdfs file system. > fs.mkdirs(new Path(filename)); > ref: > [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java] > Now the important thing to note is that if mkdirs() is a non atomic operation > (i.e can result in partial execution if interrupted) > then suppose the first directory ie test1 is created and just before creation > of test2 in hdfs happens if there is a restart to the connect worker pod. > Then the hdfs file system will remain with partial folders created for > partitions during the restart time frames. > So we might have conditions in hdfs as below > /test1=0/test2=0/ > /test1=1/ > /test1=2/test2=2 > /test1=3/test2=3 > So the second partition has a missing directory in it. And if hive > integration is enabled, hive metastore exceptions will occur since there is a > partition expected from hive table is missing for few partitions in hdfs. > *This can occur to any connector with some ongoing non atomic operation and a > restart is triggered to kafka connect worker pod. This will result in some > partially completed states in the system and may cause issues for the > connector to continue its operation*. > *This is a very critical issue and needs some attention on ways for handling >
[jira] [Commented] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.
[ https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269064#comment-17269064 ] kaushik srinivas commented on KAFKA-12164: -- For the below comments "Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand that a nested directory can be constructed from a given path, even if some of the parents exist. Which makes me think that a restart would allow the creation of the full path as long as the partitioner is deterministic with respect to the record (i.e. for a given record it always creates the same path). Upon a restart the export of a record will be retried, so the partitioner could possibly reattempt the full creation of the path. But that's an early assumption from my side. " Lets suppose partitioner derives the directory /test1/test2 for a given record. And the connector crashed during the directory creation and hdfs ends up with only /test1/ folder. But there is no guarantee that the kafka record which was not committed fully by the connector would exist in kafka even after restart/recovery of the connector. So there could be scenarios where these partial directories remain partial forever and requires an additional manual intervention to clean it up or complete its creation in hdfs, without which we see issues in hive table partitions if enabled. > ssue when kafka connect worker pod restart, during creation of nested > partition directories in hdfs file system. > > > Key: KAFKA-12164 > URL: https://issues.apache.org/jira/browse/KAFKA-12164 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: kaushik srinivas >Priority: Critical > > In our production labs, an issue is observed. Below is the sequence of the > same. > # hdfs connector is added to the connect worker. > # hdfs connector is creating folders in hdfs /test1=1/test2=2/ > Based on the custom partitioner. Here test1 and test2 are two separate nested > directories derived from multiple fields in the record using a custom > partitioner. > # Now kafka connect hdfs connector uses below function calls to create the > directories in the hdfs file system. > fs.mkdirs(new Path(filename)); > ref: > [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java] > Now the important thing to note is that if mkdirs() is a non atomic operation > (i.e can result in partial execution if interrupted) > then suppose the first directory ie test1 is created and just before creation > of test2 in hdfs happens if there is a restart to the connect worker pod. > Then the hdfs file system will remain with partial folders created for > partitions during the restart time frames. > So we might have conditions in hdfs as below > /test1=0/test2=0/ > /test1=1/ > /test1=2/test2=2 > /test1=3/test2=3 > So the second partition has a missing directory in it. And if hive > integration is enabled, hive metastore exceptions will occur since there is a > partition expected from hive table is missing for few partitions in hdfs. > *This can occur to any connector with some ongoing non atomic operation and a > restart is triggered to kafka connect worker pod. This will result in some > partially completed states in the system and may cause issues for the > connector to continue its operation*. > *This is a very critical issue and needs some attention on ways for handling > the same.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.
[ https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269061#comment-17269061 ] kaushik srinivas commented on KAFKA-12164: -- Hi [~kkonstantine] We have created the ticket even on the hdfs sink connector side as well. Below is the ticket [https://github.com/confluentinc/kafka-connect-hdfs/issues/538] We have also captured more detailed analysis over there. But there are no responses in that forum as well. -Kaushik > ssue when kafka connect worker pod restart, during creation of nested > partition directories in hdfs file system. > > > Key: KAFKA-12164 > URL: https://issues.apache.org/jira/browse/KAFKA-12164 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: kaushik srinivas >Priority: Critical > > In our production labs, an issue is observed. Below is the sequence of the > same. > # hdfs connector is added to the connect worker. > # hdfs connector is creating folders in hdfs /test1=1/test2=2/ > Based on the custom partitioner. Here test1 and test2 are two separate nested > directories derived from multiple fields in the record using a custom > partitioner. > # Now kafka connect hdfs connector uses below function calls to create the > directories in the hdfs file system. > fs.mkdirs(new Path(filename)); > ref: > [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java] > Now the important thing to note is that if mkdirs() is a non atomic operation > (i.e can result in partial execution if interrupted) > then suppose the first directory ie test1 is created and just before creation > of test2 in hdfs happens if there is a restart to the connect worker pod. > Then the hdfs file system will remain with partial folders created for > partitions during the restart time frames. > So we might have conditions in hdfs as below > /test1=0/test2=0/ > /test1=1/ > /test1=2/test2=2 > /test1=3/test2=3 > So the second partition has a missing directory in it. And if hive > integration is enabled, hive metastore exceptions will occur since there is a > partition expected from hive table is missing for few partitions in hdfs. > *This can occur to any connector with some ongoing non atomic operation and a > restart is triggered to kafka connect worker pod. This will result in some > partially completed states in the system and may cause issues for the > connector to continue its operation*. > *This is a very critical issue and needs some attention on ways for handling > the same.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.
[ https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269056#comment-17269056 ] Konstantine Karantasis commented on KAFKA-12164: Thanks for reporting [~kaushik srinivas] First, I need to note that this issue seems to belong in its entirety to: [https://github.com/confluentinc/kafka-connect-hdfs/issues] given that the behavior is not affected by the guarantees that the Kafka Connect framework provides. Briefly, let me state the obvious, that from a worker's perspective restarts (graceful or not) can happen at any time. Additionally the connector does not provide any guarantees with respect to the atomicity of the nested directory structure. Things become more challenging given that the partitioner as you mentioned is custom. Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand that a nested directory can be constructed from a given path, even if some of the parents exist. Which makes me think that a restart would allow the creation of the full path as long as the partitioner is deterministic with respect to the record (i.e. for a given record it always creates the same path). Upon a restart the export of a record will be retried, so the partitioner could possibly reattempt the full creation of the path. But that's an early assumption from my side. Shall we take the discussion to the appropriate forum? https://github.com/confluentinc/kafka-connect-hdfs/issues > ssue when kafka connect worker pod restart, during creation of nested > partition directories in hdfs file system. > > > Key: KAFKA-12164 > URL: https://issues.apache.org/jira/browse/KAFKA-12164 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: kaushik srinivas >Priority: Critical > > In our production labs, an issue is observed. Below is the sequence of the > same. > # hdfs connector is added to the connect worker. > # hdfs connector is creating folders in hdfs /test1=1/test2=2/ > Based on the custom partitioner. Here test1 and test2 are two separate nested > directories derived from multiple fields in the record using a custom > partitioner. > # Now kafka connect hdfs connector uses below function calls to create the > directories in the hdfs file system. > fs.mkdirs(new Path(filename)); > ref: > [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java] > Now the important thing to note is that if mkdirs() is a non atomic operation > (i.e can result in partial execution if interrupted) > then suppose the first directory ie test1 is created and just before creation > of test2 in hdfs happens if there is a restart to the connect worker pod. > Then the hdfs file system will remain with partial folders created for > partitions during the restart time frames. > So we might have conditions in hdfs as below > /test1=0/test2=0/ > /test1=1/ > /test1=2/test2=2 > /test1=3/test2=3 > So the second partition has a missing directory in it. And if hive > integration is enabled, hive metastore exceptions will occur since there is a > partition expected from hive table is missing for few partitions in hdfs. > *This can occur to any connector with some ongoing non atomic operation and a > restart is triggered to kafka connect worker pod. This will result in some > partially completed states in the system and may cause issues for the > connector to continue its operation*. > *This is a very critical issue and needs some attention on ways for handling > the same.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
guozhangwang commented on pull request #9941: URL: https://github.com/apache/kafka/pull/9941#issuecomment-764408620 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9819: KAFKA-10694: Implement zero copy for FetchSnapshot
hachikuji commented on a change in pull request #9819: URL: https://github.com/apache/kafka/pull/9819#discussion_r561625246 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException { } @Override -public void append(ByteBuffer buffer) throws IOException { +public void append(BaseRecords records) throws IOException { if (frozen) { throw new IllegalStateException( -String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) +String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) ); } - +ByteBuffer buffer; +if (records instanceof MemoryRecords) { +buffer = ((MemoryRecords) records).buffer(); +} else { Review comment: I feel more inclined to raise an exception if we get a `BaseRecords` type that is not `MemoryRecords`. If we really get an unexpected file in here, then we need to reconsider the IO model instead of hiding a big copy. We could even make the expectation explicit in the parameter type even if it is not 100% symmetric with `RawSnapshotReader`. ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1093,9 +1093,11 @@ public static final void readFully(InputStream inputStream, ByteBuffer destinati destinationBuffer.position(destinationBuffer.position() + totalBytesRead); } -public static void writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException { +public static int writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException { +int size = 0; Review comment: Hmm.. Not sure we need to compute this. Wouldn't it be the same as `sourceBuffer.remaining()`? ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1220,34 +1216,35 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( if (!snapshotOpt.isPresent()) { return FetchSnapshotResponse.singleton( log.topicPartition(), -responsePartitionSnapshot -> { -return addQuorumLeader(responsePartitionSnapshot) -.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code()); -} +responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot) +.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code()) ); } try (RawSnapshotReader snapshot = snapshotOpt.get()) { if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshot.sizeInBytes()) { return FetchSnapshotResponse.singleton( log.topicPartition(), -responsePartitionSnapshot -> { -return addQuorumLeader(responsePartitionSnapshot) -.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()); -} +responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot) +.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()) ); } int maxSnapshotSize; +int maxSnapshotPosition; try { maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes()); } catch (ArithmeticException e) { maxSnapshotSize = Integer.MAX_VALUE; } -ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(), maxSnapshotSize)); -snapshot.read(buffer, partitionSnapshot.position()); -buffer.flip(); +try { +maxSnapshotPosition = Math.toIntExact(partitionSnapshot.position()); +} catch (ArithmeticException e) { +maxSnapshotPosition = Integer.MAX_VALUE; Review comment: I agree we should probably throw this. Snapshot size limits are an interesting point which I hadn't thought about. Currently `FileRecords` does not support files which are larger than Int.MaxValue. That gives us a 2GB limit. My feeling is that is probably good enough initially, but perhaps that adds some fuel for the effort to generalize the zero-copy support. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
dengziming commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-764402380 @rajinisivaram I rebased the code and added a test in `PlaintextAdminIntegrationTest`, PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12227) Add method "Producer#produce" to return CompletionStage instead of Future
[ https://issues.apache.org/jira/browse/KAFKA-12227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-12227: -- Assignee: Chia-Ping Tsai > Add method "Producer#produce" to return CompletionStage instead of Future > -- > > Key: KAFKA-12227 > URL: https://issues.apache.org/jira/browse/KAFKA-12227 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > Producer and KafkaProducer return a java.util.concurrent.Future from their > send methods. This makes it challenging to write asynchronous non-blocking > code given Future's limited interface. Since Kafka now requires Java 8, we > now have the option of using CompletionStage and/or CompletableFuture that > were introduced to solve this issue. It's worth noting that the Kafka > AdminClient solved this issue by using org.apache.kafka.common.KafkaFuture as > Java 7 support was still required then. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-XXX%3A+Return+CompletableFuture+from+KafkaProducer.send -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel, !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { - // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) case CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) + } -case _ => - throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") + val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) + def createFindCoordinatorResponse(error: Errors, +node: Node, +requestThrottleMs: Int, +errorMessage: Option[String] = None): FindCoordinatorResponse = { +new FindCoordinatorResponse( + new FindCoordinatorResponseData() +.setErrorCode(error.code) +.setErrorMessage(errorMessage.getOrElse(error.message)) +.setNodeId(node.id) +.setHost(node.host) +.setPort(node.port) +.setThrottleTimeMs(requestThrottleMs)) } - def createResponse(requestThrottleMs: Int): AbstractResponse = { -def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { - new FindCoordinatorResponse( - new FindCoordinatorResponseData() -.setErrorCode(error.code) -.setErrorMessage(error.message) -.setNodeId(node.id) -.setHost(node.host) -.setPort(node.port) -.setThrottleTimeMs(requestThrottleMs)) + val topicCreationNeeded = topicMetadata.headOption.isEmpty + if (topicCreationNeeded) { +if (hasEnoughAliveBrokers(internalTopicName)) { + if (shouldForwardRequest(request)) { +forwardingManager.sendInterBrokerRequest( + getCreateTopicsRequest(Seq(internalTopicName)), + _ => ()) + } else { +val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6) + +val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName)) +adminManager.createTopics( + config.requestTimeoutMs, + validateOnly = false, + topicConfigs, + Map.empty, + controllerMutationQuota, + _ => ()) + } } -val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) { - createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) -} else { - val coordinatorEndpoint = topicMetadata.partitions.asScala -.find(_.partitionIndex == partition) -.filter(_.leaderId != MetadataResponse.NO_LEADER_ID) -.flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId)) -.flatMap(_.getNode(request.context.listenerName)) -.filterNot(_.isEmpty) - - coordinatorEndpoint match { -case Some(endpoint) => - createFindCoordinatorResponse(Errors.NONE, endpoint) -case _ => - createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) + +requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse( + Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)) + } else { +def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) { +
[GitHub] [kafka] chia7712 commented on pull request #9939: MINOR: fix @link tag in javadoc
chia7712 commented on pull request #9939: URL: https://github.com/apache/kafka/pull/9939#issuecomment-764247507 @tang7526 thanks for your patch. Could you fix other docs error also? ``` /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:42: warning: Could not find any member to link for "Serde". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:24: warning: Could not find any member to link for "org.apache.kafka.streams.kstream.Joined". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:24: warning: Could not find any member to link for "Serde". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala:37: warning: Could not find any member to link for "Serde". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala:24: warning: Could not find any member to link for "Serde". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:68: warning: Could not find any member to link for "Topology.AutoOffsetReset". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:55: warning: Could not find any member to link for "TimestampExtractor". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:45: warning: Could not find any member to link for "Serde". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala:157: warning: Could not find any member to link for "org.apache.kafka.streams.errors.TopologyException". /** ^ 44 warnings ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log
feyman2016 commented on a change in pull request #9739: URL: https://github.com/apache/kafka/pull/9739#discussion_r561589867 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1057,7 +1057,8 @@ class Log(@volatile private var _dir: File, leaderEpoch: Int, origin: AppendOrigin = AppendOrigin.Client, interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = { -append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false) +val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader +append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false) Review comment: Make sense to me, it is also a little bit odd to me, but I put it here because I think `assignOffsets`==true for `appendAsLeader` and ==false for `appendAsFollower`, which means normally `assignOffsets` is determined by the caller, the `RaftLeader` is just a special case for `appendAsLeader`, if we move the logic in `analyzeAndValidateRecords`, that means it need to determine whether to `assignOffsets` without caller info, does that doable? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9891: MINOR: Remind user index file is empty when dumping LogSegment index file
chia7712 commented on pull request #9891: URL: https://github.com/apache/kafka/pull/9891#issuecomment-764233284 @17hao Thanks for your patch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9891: MINOR: Remind user index file is empty when dumping LogSegment index file
chia7712 merged pull request #9891: URL: https://github.com/apache/kafka/pull/9891 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log
feyman2016 commented on a change in pull request #9739: URL: https://github.com/apache/kafka/pull/9739#discussion_r561587550 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -180,7 +179,7 @@ private void startNewBatch() { nextOffset, time.milliseconds(), false, -RecordBatch.NO_PARTITION_LEADER_EPOCH, +epoch, Review comment: Let me check This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan opened a new pull request #9944: URL: https://github.com/apache/kafka/pull/9944 Most components are ready here. Topic IDs replace topic names in fetch requests. I need to look into FetchSession more. Currently testCreateIncrementalFetchWithPartitionsWithIdError() fails because FetchSession does not cache partitions that resulted in UNKNOWN_TOPIC_ID error. I need to figure out if this is something that should be done. I also plan to add a test to simulate a rolling upgrade where some brokers do not have topic IDs in metadata. I want to ensure that Consumers can still poll and get data. I will also run the various fetch benchmarks and compare the results to trunk ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
dengziming commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-764219186 > https://github.com/apache/kafka/pull/9769#issuecomment-764014519 @rajinisivaram @jolshan I also realized this, we add topicId in `MetadataCache` from IBP=2.8.1, so instead of using `config.usesTopicId` it would make sense to check `config.interBrokerProtocolVersion >= KAFKA_2_8_IV1` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9926: KAFKA-12175 Migrate generator module to junit5
chia7712 merged pull request #9926: URL: https://github.com/apache/kafka/pull/9926 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5
chia7712 commented on pull request #9926: URL: https://github.com/apache/kafka/pull/9926#issuecomment-764212202 > What have you seen that implies that? The following error implies that recent fixes are not included. ``` java.lang.AssertionError: Expected all streams instances in [org.apache.kafka.streams.KafkaStreams@2d6a0fff] to be REBALANCING within 3 ms, but the following were not: {org.apache.kafka.streams.KafkaStreams@2d6a0fff=RUNNING} at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitForApplicationState$12(IntegrationTestUtils.java:936) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:350) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:318) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState(IntegrationTestUtils.java:919) at org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect(AdjustStreamThreadCountTest.java:229) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ``` https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9926/8/testReport/junit/org.apache.kafka.streams.integration/AdjustStreamThreadCountTest/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hongshaoyang edited a comment on pull request #9943: MINOR: Fix typo in Utils.java
hongshaoyang edited a comment on pull request #9943: URL: https://github.com/apache/kafka/pull/9943#issuecomment-764209143 Ping @guozhangwang Original typo was added in https://github.com/apache/kafka/commit/3a9f4b833bb7e86dc759361c33f4321ab043db05 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hongshaoyang commented on pull request #9943: MINOR: Fix typo in Utils.java
hongshaoyang commented on pull request #9943: URL: https://github.com/apache/kafka/pull/9943#issuecomment-764209143 Ping @guozhangwang Original typo was added in https://github.com/apache/kafka/commit/3a9f4b833bb7e86dc759361c33f4321ab043db05 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hongshaoyang opened a new pull request #9943: MINOR: Fix typo in Update Utils.java
hongshaoyang opened a new pull request #9943: URL: https://github.com/apache/kafka/pull/9943 ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9942: KAFKA-12229: reset to original class loader after connector stop
showuon commented on pull request #9942: URL: https://github.com/apache/kafka/pull/9942#issuecomment-764189012 @chia7712 @ijuma @guozhangwang , please take a look at the PR. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #9942: KAFKA-12229: reset to original class loader after connector stop
showuon opened a new pull request #9942: URL: https://github.com/apache/kafka/pull/9942 ``` java.lang.NullPointerException at org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopics(MirrorSourceConnector.java:348) at org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:192) at org.apache.kafka.connect.mirror.MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst(MirrorSourceConnectorTest.java:222) ``` After days of investigation, I finally found the root cause of the test failure reason: **class loader**. The issue is quite weird, we mocked the method, but still call the real method, and cause the NPE. Digging into the Mockito, found it's not about JUnit 5, it's because of the class loader. In Mockito, we relies on the class loader to generate the proxy instance ([source](https://github.com/mockito/mockito/blob/release/3.x/src/main/java/org/mockito/internal/creation/bytebuddy/SubclassBytecodeGenerator.java#L91)), and if the class loader is not expected, we'll generate the wrong proxy instance (with wrong class path). We set the class loader during connector start to resolve conflicting dependencies (KIP-146), so we should set it back to the original class loader after connector stop in tests (`EmbeddedConnectCluster` is only used in tests) for Mockito works as expected. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9840: KAFKA-10867: Improved task idling
guozhangwang commented on a change in pull request #9840: URL: https://github.com/apache/kafka/pull/9840#discussion_r561459260 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -78,15 +89,149 @@ RecordQueue queue() { } } -PartitionGroup(final Map partitionQueues, final Sensor recordLatenessSensor) { +PartitionGroup(final TaskId id, + final Map partitionQueues, + final Sensor recordLatenessSensor, + final Sensor enforcedProcessingSensor, + final long maxTaskIdleMs) { +this.id = id; nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); this.partitionQueues = partitionQueues; +this.enforcedProcessingSensor = enforcedProcessingSensor; +this.maxTaskIdleMs = maxTaskIdleMs; this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; allBuffered = false; streamTime = RecordQueue.UNKNOWN; } +public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) { +final Long lag = metadata.lag(); +if (lag != null) { +LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag); +fetchedLags.put(partition, lag); +} +} + +public boolean readyToProcess(final long wallClockTime) { +if (LOG.isTraceEnabled()) { +for (final Map.Entry entry : partitionQueues.entrySet()) { +LOG.trace( +"[{}] buffered/lag {}: {}/{}", +id, +entry.getKey(), +entry.getValue().size(), +fetchedLags.get(entry.getKey()) +); +} +} +// Log-level strategy: +// TRACE for messages that don't wait for fetches, since these may be logged at extremely high frequency +// DEBUG when we waited for a fetch and decided to wait some more, as configured +// DEBUG when we are ready for processing and didn't have to enforce processing +// INFO when we enforce processing, since this has to wait for fetches AND may result in disorder + +if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) { +if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) { +final Set bufferedPartitions = new HashSet<>(); +final Set emptyPartitions = new HashSet<>(); +for (final Map.Entry entry : partitionQueues.entrySet()) { +if (entry.getValue().isEmpty()) { +emptyPartitions.add(entry.getKey()); +} else { +bufferedPartitions.add(entry.getKey()); +} +} +LOG.trace("[{}] Ready for processing because max.task.idle.ms is disabled." + + "\n\tThere may be out-of-order processing for this task as a result." + + "\n\tBuffered partitions: {}" + + "\n\tNon-buffered partitions: {}", + id, + bufferedPartitions, + emptyPartitions); +} +return true; Review comment: Should we log INFO if we are indeed enforcing processing? I.e. there are some empty partitions. ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -134,6 +134,8 @@ @SuppressWarnings("deprecation") public class StreamsConfig extends AbstractConfig { +public static final long MAX_TASK_IDLE_MS_DISABLED = -1; Review comment: nit: move this down below to 147? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ## @@ -73,28 +82,22 @@ private final byte[] recordKey = intSerializer.serialize(null, 1); private final Metrics metrics = new Metrics(); +private final Sensor enforcedProcessingSensor = metrics.sensor(UUID.randomUUID().toString()); private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap()); -private PartitionGroup group; private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName) { final Sensor lastRecordedValue = metrics.sensor(metricName.name()); lastRecordedValue.add(metricName, new Value()); return lastRecordedValue; } -@Before Review comment: Good refactoring! ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -52,15 +58,20 @@ * (i.e., it increases or stays the same over time). */ public class
[GitHub] [kafka] showuon closed pull request #9936: [WIP] reset to default class loader
showuon closed pull request #9936: URL: https://github.com/apache/kafka/pull/9936 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12229) Fix flaky MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst test
Luke Chen created KAFKA-12229: - Summary: Fix flaky MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst test Key: KAFKA-12229 URL: https://issues.apache.org/jira/browse/KAFKA-12229 Project: Kafka Issue Type: Test Reporter: Luke Chen Assignee: Luke Chen h3. Stacktrace java.lang.NullPointerException at org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopics(MirrorSourceConnector.java:348) at org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:192) at org.apache.kafka.connect.mirror.MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst(MirrorSourceConnectorTest.java:222) Also happened in MirrorCheckpointConnectorTest.testFindConsumerGroups h3. Stacktrace java.lang.NullPointerException at org.apache.kafka.connect.mirror.MirrorCheckpointConnector.listConsumerGroups(MirrorCheckpointConnector.java:158) at org.apache.kafka.connect.mirror.MirrorCheckpointConnectorTest.testFindConsumerGroups(MirrorCheckpointConnectorTest.java:89) https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk11/415/testReport/junit/org.apache.kafka.connect.mirror/MirrorSourceConnectorTest/testRefreshTopicPartitionsTopicOnTargetFirst__/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
hachikuji commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r561451602 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel, !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { - // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) case CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) + } -case _ => - throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") + val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) + def createFindCoordinatorResponse(error: Errors, +node: Node, +requestThrottleMs: Int, +errorMessage: Option[String] = None): FindCoordinatorResponse = { +new FindCoordinatorResponse( + new FindCoordinatorResponseData() +.setErrorCode(error.code) +.setErrorMessage(errorMessage.getOrElse(error.message)) +.setNodeId(node.id) +.setHost(node.host) +.setPort(node.port) +.setThrottleTimeMs(requestThrottleMs)) } - def createResponse(requestThrottleMs: Int): AbstractResponse = { -def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { - new FindCoordinatorResponse( - new FindCoordinatorResponseData() -.setErrorCode(error.code) -.setErrorMessage(error.message) -.setNodeId(node.id) -.setHost(node.host) -.setPort(node.port) -.setThrottleTimeMs(requestThrottleMs)) + val topicCreationNeeded = topicMetadata.headOption.isEmpty + if (topicCreationNeeded) { +if (hasEnoughAliveBrokers(internalTopicName)) { Review comment: In the case of forwarding, maybe we can let the controller decide if there are enough alive brokers. ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel, !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { - // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) case CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) + } -case _ => - throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") + val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) + def createFindCoordinatorResponse(error: Errors,
[GitHub] [kafka] guozhangwang commented on a change in pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords
guozhangwang commented on a change in pull request #9836: URL: https://github.com/apache/kafka/pull/9836#discussion_r561449314 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2041,6 +2052,118 @@ public void testInvalidGroupMetadata() throws InterruptedException { assertThrows(IllegalStateException.class, consumer::groupMetadata); } +@Test +public void testPollMetadata() { +final Time time = new MockTime(); +final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); +final ConsumerMetadata metadata = createMetadata(subscription); +final MockClient client = new MockClient(time, metadata); + +initMetadata(client, singletonMap(topic, 1)); +final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + +final KafkaConsumer consumer = +newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + +consumer.assign(singleton(tp0)); +consumer.seek(tp0, 50L); + +final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); +client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo))); + +final ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); +assertEquals(5, records.count()); +assertEquals(55L, consumer.position(tp0)); + +// verify that the consumer computes the correct metadata based on the fetch response +final ConsumerRecords.Metadata actualMetadata = records.metadata().get(tp0); +assertEquals(100L, (long) actualMetadata.endOffset()); +assertEquals(55L, (long) actualMetadata.position()); +assertEquals(45L, (long) actualMetadata.lag()); +consumer.close(Duration.ZERO); +} + + +@Test +public void testPollMetadataWithExtraPartitions() { Review comment: Does the test cover 1) stale epoch, 2) no prev value, cases? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -637,20 +636,32 @@ private ListOffsetResult fetchOffsetsByTimes(Map timestamp } else { List> records = fetchRecords(nextInLineFetch, recordsRemaining); -if (!records.isEmpty()) { -TopicPartition partition = nextInLineFetch.partition; -List> currentRecords = fetched.get(partition); -if (currentRecords == null) { -fetched.put(partition, records); -} else { -// this case shouldn't usually happen because we only send one fetch at a time per partition, -// but it might conceivably happen in some rare cases (such as partition leader changes). -// we have to copy to a new list because the old one may be immutable -List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); -newRecords.addAll(currentRecords); -newRecords.addAll(records); -fetched.put(partition, newRecords); +TopicPartition partition = nextInLineFetch.partition; + +if (subscriptions.isAssigned(partition)) { +// initializeCompletedFetch, above, has already persisted the metadata from the fetch in the +// SubscriptionState, so we can just read it out, which in particular lets us re-use the logic +// for determining the end offset +final long receivedTimestamp = nextInLineFetch.receivedTimestamp; +final Long beginningOffset = subscriptions.logStartOffset(partition); +final Long endOffset = subscriptions.logEndOffset(partition, isolationLevel); +final FetchPosition fetchPosition = subscriptions.position(partition); + +final FetchedRecords.FetchMetadata fetchMetadata = fetched.metadata().get(partition); +if (fetchMetadata == null +|| !fetchMetadata.position().offsetEpoch.isPresent() +|| fetchPosition.offsetEpoch.isPresent() +&& fetchMetadata.position().offsetEpoch.get() <= fetchPosition.offsetEpoch.get()) { Review comment: Interesting, why we do not want to update the metadata if epoch is stale? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -637,20 +636,32 @@ private ListOffsetResult fetchOffsetsByTimes(Map timestamp } else { List> records = fetchRecords(nextInLineFetch, recordsRemaining); -
[GitHub] [kafka] mjsax commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class
mjsax commented on a change in pull request #9940: URL: https://github.com/apache/kafka/pull/9940#discussion_r561434718 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -2647,8 +2647,7 @@ public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() { task01.setCommittableOffsetsAndMetadata(offsetsT01); final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true); -consumer.groupMetadata(); -expectLastCall().andReturn(null).anyTimes(); +expect(consumer.groupMetadata()).andStubReturn(null); Review comment: Another simplification; make it a one-liner. Same below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class
mjsax commented on a change in pull request #9940: URL: https://github.com/apache/kafka/pull/9940#discussion_r561434387 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -190,7 +190,7 @@ public void shouldIdempotentlyUpdateSubscriptionFromActiveAssignment() { final TopicPartition newTopicPartition = new TopicPartition("topic2", 1); final Map> assignment = mkMap(mkEntry(taskId01, mkSet(t1p1, newTopicPartition))); -expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andReturn(emptyList()).anyTimes(); +expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(emptyList()); Review comment: @ableegoldman I just update the whole test class... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class
mjsax commented on a change in pull request #9940: URL: https://github.com/apache/kafka/pull/9940#discussion_r561429380 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -2779,6 +2779,62 @@ public void suspend() { assertThat(task01.state(), is(Task.State.SUSPENDED)); } +@Test +public void shouldConvertActiveTaskToStandbyTask() { +final StreamTask activeTask = mock(StreamTask.class); +expect(activeTask.id()).andReturn(taskId00).anyTimes(); + expect(activeTask.inputPartitions()).andReturn(taskId00Partitions).anyTimes(); Review comment: Will fix right away -- Jenkins failed anyway and we need to rerun it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class
mjsax commented on pull request #9940: URL: https://github.com/apache/kafka/pull/9940#issuecomment-764070908 All three runs failed with different errors: JDK8: ``` kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 37.320395596193315 (600 connections / 16.077 sec) ==> expected: <30.0> but was: <37.320395596193315> ``` JDK11: ``` kafka.admin.FeatureCommandTest.testDescribeFeaturesSuccess() org.opentest4j.AssertionFailedError: expected: but was: ``` JDK15: ``` kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback() org.opentest4j.AssertionFailedError: Partition [group1_largeTopic,5] metadata not propagated after 15000 ms // and kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback() org.opentest4j.AssertionFailedError: Partition [group1_largeTopic,5] metadata not propagated after 15000 ms ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9819: KAFKA-10694: Implement zero copy for FetchSnapshot
jsancio commented on a change in pull request #9819: URL: https://github.com/apache/kafka/pull/9819#discussion_r561427699 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException { } @Override -public void append(ByteBuffer buffer) throws IOException { +public void append(BaseRecords records) throws IOException { if (frozen) { throw new IllegalStateException( -String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) +String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) ); } - +ByteBuffer buffer; +if (records instanceof MemoryRecords) { +buffer = ((MemoryRecords) records).buffer(); +} else { +buffer = ByteBuffer.allocate(records.sizeInBytes()); +((FileRecords) records).channel().read(buffer); +buffer.flip(); +} Review comment: > I change the signature to keep consistent with FileRawSnapshotReader Okay. I think this is something that I struggled with when creating the original APIs. I am okay with "inconsistent" APIs since `RawSnapshot{Reader,Writer}` are internal interfaces to the raft client and are not exposed to the state machine (controller). I think this "inconsistency" will go away when we implement the long term solution. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
guozhangwang commented on pull request #9671: URL: https://github.com/apache/kafka/pull/9671#issuecomment-764063387 > Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it Hmm... I'm not familiar with SaslXConsumerTest either... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
wcarlson5 commented on a change in pull request #9941: URL: https://github.com/apache/kafka/pull/9941#discussion_r561415932 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -407,7 +413,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, referenceContainer.nextScheduledRebalanceMs, shutdownErrorHook, streamsUncaughtExceptionHandler, -cacheSize -> cache.resize(cacheSize) Review comment: I think it caused a test to fail but not everytime. It also could have been fixed since then as changes have been made. If all the tests pass it's probably fine This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper
hachikuji commented on a change in pull request #9912: URL: https://github.com/apache/kafka/pull/9912#discussion_r561415958 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -371,9 +372,44 @@ class RequestChannel(val queueSize: Int, requestQueue.put(request) } - /** Send a response back to the socket server to be sent over the network */ - def sendResponse(response: RequestChannel.Response): Unit = { + def closeConnection( +request: RequestChannel.Request, +errorCounts: java.util.Map[Errors, Integer] + ): Unit = { +// This case is used when the request handler has encountered an error, but the client +// does not expect a response (e.g. when produce request has acks set to 0) +updateErrorMetrics(request.header.apiKey, errorCounts.asScala) +sendResponse(new RequestChannel.CloseConnectionResponse(request)) + } + + def sendResponse( +request: RequestChannel.Request, +response: AbstractResponse, +onComplete: Option[Send => Unit] Review comment: I decided not to do this here. I didn't like replacing `None` with `_ => {}` in uses, and neither did I like making the argument optional. The alternative is to introduce a constant "no-op" function, but I found this also a little awkward. If you think of a nice way to do it, I can review. I do think it is better having a simpler type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
ableegoldman commented on a change in pull request #9941: URL: https://github.com/apache/kafka/pull/9941#discussion_r561412170 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -407,7 +413,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, referenceContainer.nextScheduledRebalanceMs, shutdownErrorHook, streamsUncaughtExceptionHandler, -cacheSize -> cache.resize(cacheSize) Review comment: Interesting. It *should* be exactly the same, but of course who knows with Java. Did it cause a test to fail or was it something more subtle? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
wcarlson5 commented on pull request #9941: URL: https://github.com/apache/kafka/pull/9941#issuecomment-764054886 Overall LGTM. I am not sure about the cache change but the changes to the log makes a lot of sense This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
ableegoldman commented on pull request #9671: URL: https://github.com/apache/kafka/pull/9671#issuecomment-764053539 Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
wcarlson5 commented on a change in pull request #9941: URL: https://github.com/apache/kafka/pull/9941#discussion_r561409907 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -407,7 +413,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, referenceContainer.nextScheduledRebalanceMs, shutdownErrorHook, streamsUncaughtExceptionHandler, -cacheSize -> cache.resize(cacheSize) Review comment: My ide tried to optimize this as well. At the time not passing in cacheSize caused some expections. I would be careful about making this change without need This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
ableegoldman commented on a change in pull request #9941: URL: https://github.com/apache/kafka/pull/9941#discussion_r561394151 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1053,7 +1054,7 @@ int maybeCommitActiveTasksPerUserRequested() { } private void commitOffsetsOrTransaction(final Map> offsetsPerTask) { -log.debug("Committing task offsets {}", offsetsPerTask); +log.debug("Committing task offsets {}", offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t -> t.getKey().id(), Entry::getValue))); // avoid logging actual Task objects Review comment: An unrelated but equally annoying thing I noticed in the logs: we should never log a full `Task` object because it prints literally everything about the task, including for example the topology description which is not that useful but sometimes VERY long This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
ableegoldman commented on pull request #9941: URL: https://github.com/apache/kafka/pull/9941#issuecomment-764036403 call for review @lct45 @wcarlson5 @cadonna This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
ableegoldman opened a new pull request #9941: URL: https://github.com/apache/kafka/pull/9941 One more try to get the logging levels right..the only way to log something within the main StreamThread loop without absolutely flooding the logs at a level that isn't appropriate for INFO is to just set some kind of interval to stick to. I chose to log the summary every 2 min, since this is long enough to prevent log spam but short enough to fit at least 2 summaries within the (default) poll interval of 5 min This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10044) Deprecate ConsumerConfig#addDeserializerToConfig and ProducerConfig#addSerializerToConfig
[ https://issues.apache.org/jira/browse/KAFKA-10044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10044: Description: from [~ijuma] suggestion ([https://github.com/apache/kafka/pull/8605#discussion_r430431086]) {quote}I think you could submit a KIP for the deprecation of the two methods in this class, but we can merge the other changes in the meantime. {quote} KIP-620: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=155749118] was: from [~ijuma] suggestion (https://github.com/apache/kafka/pull/8605#discussion_r430431086) {quote} I think you could submit a KIP for the deprecation of the two methods in this class, but we can merge the other changes in the meantime. {quote} > Deprecate ConsumerConfig#addDeserializerToConfig and > ProducerConfig#addSerializerToConfig > - > > Key: KAFKA-10044 > URL: https://issues.apache.org/jira/browse/KAFKA-10044 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: kip > > from [~ijuma] suggestion > ([https://github.com/apache/kafka/pull/8605#discussion_r430431086]) > {quote}I think you could submit a KIP for the deprecation of the two methods > in this class, but we can merge the other changes in the meantime. > {quote} > KIP-620: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=155749118] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10044) Deprecate ConsumerConfig#addDeserializerToConfig and ProducerConfig#addSerializerToConfig
[ https://issues.apache.org/jira/browse/KAFKA-10044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10044: Labels: kip (was: need-kip) > Deprecate ConsumerConfig#addDeserializerToConfig and > ProducerConfig#addSerializerToConfig > - > > Key: KAFKA-10044 > URL: https://issues.apache.org/jira/browse/KAFKA-10044 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: kip > > from [~ijuma] suggestion > (https://github.com/apache/kafka/pull/8605#discussion_r430431086) > {quote} > I think you could submit a KIP for the deprecation of the two methods in this > class, but we can merge the other changes in the meantime. > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10044) Deprecate ConsumerConfig#addDeserializerToConfig and ProducerConfig#addSerializerToConfig
[ https://issues.apache.org/jira/browse/KAFKA-10044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10044: Fix Version/s: 2.7.0 > Deprecate ConsumerConfig#addDeserializerToConfig and > ProducerConfig#addSerializerToConfig > - > > Key: KAFKA-10044 > URL: https://issues.apache.org/jira/browse/KAFKA-10044 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: kip > Fix For: 2.7.0 > > > from [~ijuma] suggestion > ([https://github.com/apache/kafka/pull/8605#discussion_r430431086]) > {quote}I think you could submit a KIP for the deprecation of the two methods > in this class, but we can merge the other changes in the meantime. > {quote} > KIP-620: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=155749118] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
aloknnikhil commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561376605 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -750,9 +748,9 @@ void start(int nodeId) { PersistentState persistentState = nodes.get(nodeId); MockNetworkChannel channel = new MockNetworkChannel(correlationIdCounter); MockMessageQueue messageQueue = new MockMessageQueue(); -List voterNodes = voterNodesFromIds(voters, Cluster::nodeAddress); -RaftConfig raftConfig = buildRaftConfig(REQUEST_TIMEOUT_MS, RETRY_BACKOFF_MS, ELECTION_TIMEOUT_MS, -ELECTION_JITTER_MS, FETCH_TIMEOUT_MS, LINGER_MS, voterNodes); +Map voterNodes = voterNodesFromIds(voters, Cluster::nodeAddress); Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
aloknnikhil commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561375789 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ## @@ -16,36 +16,15 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.Node; - import java.net.InetSocketAddress; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; public class RaftTestUtil { -public static RaftConfig buildRaftConfig( -int requestTimeoutMs, -int retryBackoffMs, -int electionTimeoutMs, -int electionBackoffMs, -int fetchTimeoutMs, -int appendLingerMs, -List voterNodes -) { -Map voterConnections = voterNodes.stream() -.collect(Collectors.toMap(Node::id, node -> new InetSocketAddress(node.host(), node.port(; -return new RaftConfig(voterConnections, requestTimeoutMs, retryBackoffMs, electionTimeoutMs, electionBackoffMs, -fetchTimeoutMs, appendLingerMs); -} - -public static List voterNodesFromIds(Set voterIds, +public static Map voterNodesFromIds(Set voterIds, Review comment: Removed in favor of inlined code https://github.com/apache/kafka/pull/9916#discussion_r561375612 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
aloknnikhil commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561375612 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ## @@ -16,36 +16,15 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.Node; - import java.net.InetSocketAddress; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; public class RaftTestUtil { -public static RaftConfig buildRaftConfig( -int requestTimeoutMs, -int retryBackoffMs, -int electionTimeoutMs, -int electionBackoffMs, -int fetchTimeoutMs, -int appendLingerMs, -List voterNodes -) { -Map voterConnections = voterNodes.stream() -.collect(Collectors.toMap(Node::id, node -> new InetSocketAddress(node.host(), node.port(; -return new RaftConfig(voterConnections, requestTimeoutMs, retryBackoffMs, electionTimeoutMs, electionBackoffMs, -fetchTimeoutMs, appendLingerMs); -} - -public static List voterNodesFromIds(Set voterIds, +public static Map voterNodesFromIds(Set voterIds, Function voterAddressGenerator) { -return voterIds.stream().map(voterId -> { -InetSocketAddress voterAddress = voterAddressGenerator.apply(voterId); -return new Node(voterId, voterAddress.getHostName(), voterAddress.getPort()); -}).collect(Collectors.toList()); +return voterIds.stream().collect(Collectors.toMap(id -> id, voterAddressGenerator)); Review comment: I'm up for removing any extra files when we don't need them. Aaand, it's gone. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class
ableegoldman commented on a change in pull request #9940: URL: https://github.com/apache/kafka/pull/9940#discussion_r561368977 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -2779,6 +2779,62 @@ public void suspend() { assertThat(task01.state(), is(Task.State.SUSPENDED)); } +@Test +public void shouldConvertActiveTaskToStandbyTask() { +final StreamTask activeTask = mock(StreamTask.class); +expect(activeTask.id()).andReturn(taskId00).anyTimes(); + expect(activeTask.inputPartitions()).andReturn(taskId00Partitions).anyTimes(); Review comment: nit: use `andStubReturn` instead of `andReturn().anyTimes`. No need to change this now, don't want to block the fix, just fyi for future PRs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10848) Allow fine grained control over cross-partition processing order
[ https://issues.apache.org/jira/browse/KAFKA-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268915#comment-17268915 ] Matthias J. Sax commented on KAFKA-10848: - Thanks [~guozhang] – the idea of this ticket was along the same lines as your proposal. How we design the interface would be something to discuss on the KIP. Samza for example uses a quite different interface: [https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html] – Don't know which API would be favorable. > Allow fine grained control over cross-partition processing order > > > Key: KAFKA-10848 > URL: https://issues.apache.org/jira/browse/KAFKA-10848 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > Currently, KafkaStreams implements a hard-coded timestamp based strategy to > pick the next record to process for a task, given that a task has multiple > partitions. > In general, this strategy works well for the DSL, but for PAPI users, there > might be cases when the strategy should be customized. And even for the DSL, > there is one corner case (for a stream-table join) for which the table-side > record should be processed first if two records have the same timestamp (at > least, this gap exists as long as we don't have multi-version KTables), while > we cannot enforce this behavior because at runtime we don't know anything > about KStream vs KTable or an existing downstream join. > Thus, we might want to allow users to plugin a custom strategy to pick the > next record for processing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
hachikuji commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561344761 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -750,9 +748,9 @@ void start(int nodeId) { PersistentState persistentState = nodes.get(nodeId); MockNetworkChannel channel = new MockNetworkChannel(correlationIdCounter); MockMessageQueue messageQueue = new MockMessageQueue(); -List voterNodes = voterNodesFromIds(voters, Cluster::nodeAddress); -RaftConfig raftConfig = buildRaftConfig(REQUEST_TIMEOUT_MS, RETRY_BACKOFF_MS, ELECTION_TIMEOUT_MS, -ELECTION_JITTER_MS, FETCH_TIMEOUT_MS, LINGER_MS, voterNodes); +Map voterNodes = voterNodesFromIds(voters, Cluster::nodeAddress); Review comment: nit: `voterNodes` -> `voterAddresses` or `voterAddressMap`? ## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ## @@ -16,36 +16,15 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.Node; - import java.net.InetSocketAddress; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; public class RaftTestUtil { -public static RaftConfig buildRaftConfig( -int requestTimeoutMs, -int retryBackoffMs, -int electionTimeoutMs, -int electionBackoffMs, -int fetchTimeoutMs, -int appendLingerMs, -List voterNodes -) { -Map voterConnections = voterNodes.stream() -.collect(Collectors.toMap(Node::id, node -> new InetSocketAddress(node.host(), node.port(; -return new RaftConfig(voterConnections, requestTimeoutMs, retryBackoffMs, electionTimeoutMs, electionBackoffMs, -fetchTimeoutMs, appendLingerMs); -} - -public static List voterNodesFromIds(Set voterIds, +public static Map voterNodesFromIds(Set voterIds, Review comment: nit: maybe `buildAddressMap` or something like that? ## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ## @@ -16,36 +16,15 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.Node; - import java.net.InetSocketAddress; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; public class RaftTestUtil { -public static RaftConfig buildRaftConfig( -int requestTimeoutMs, -int retryBackoffMs, -int electionTimeoutMs, -int electionBackoffMs, -int fetchTimeoutMs, -int appendLingerMs, -List voterNodes -) { -Map voterConnections = voterNodes.stream() -.collect(Collectors.toMap(Node::id, node -> new InetSocketAddress(node.host(), node.port(; -return new RaftConfig(voterConnections, requestTimeoutMs, retryBackoffMs, electionTimeoutMs, electionBackoffMs, -fetchTimeoutMs, appendLingerMs); -} - -public static List voterNodesFromIds(Set voterIds, +public static Map voterNodesFromIds(Set voterIds, Function voterAddressGenerator) { -return voterIds.stream().map(voterId -> { -InetSocketAddress voterAddress = voterAddressGenerator.apply(voterId); -return new Node(voterId, voterAddress.getHostName(), voterAddress.getPort()); -}).collect(Collectors.toList()); +return voterIds.stream().collect(Collectors.toMap(id -> id, voterAddressGenerator)); Review comment: Guess we can keep it, but this helper doesn't seem to be doing much for us anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-764014519 @dengziming I realized that for this, it really depends on the IBP of the controller. (That is, we need UpdateMetadata to send topic IDs to all the brokers). So maybe instead of checking IBP it would make sense to check if the MetadataCache does not have any topic IDs. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12190) Failure on Windows due to an UnsupportedOperationException when StateDirectory sets file permissions
[ https://issues.apache.org/jira/browse/KAFKA-12190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268914#comment-17268914 ] Matthias J. Sax commented on KAFKA-12190: - \cc [~ijuma] [~guozhang] – what do you think? Should we do a `2.6.2` release quickly because of this bug? > Failure on Windows due to an UnsupportedOperationException when > StateDirectory sets file permissions > > > Key: KAFKA-12190 > URL: https://issues.apache.org/jira/browse/KAFKA-12190 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1, 2.7.1 >Reporter: Andy Wilkinson >Priority: Critical > Labels: bug > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > There appears to be a regression in Kafka 2.6.1 due to [the > changes|https://github.com/apache/kafka/pull/9583] made for KAFKA-10705 that > causes a failure on Windows. After upgrading to 2.6.1 from 2.6.0, we're > seeing failures in Spring Boot's CI on Windows such as the following: > {noformat} > Caused by: java.lang.UnsupportedOperationException: (No message provided) > at java.nio.file.Files.setPosixFilePermissions(Files.java:2044) > at > org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:115) > > at > org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:745) > at > org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:585) > at > org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:316) > > at > org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) > > at > org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) > > at > org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) > > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) > > at > org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) > > at > org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:940) > > at > org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:591) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.configureContext(AbstractApplicationContextRunner.java:447) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAndLoadContext(AbstractApplicationContextRunner.java:423) > > at > org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.getContextOrStartupFailure(AssertProviderApplicationContextInvocationHandler.java:61) > > at > org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.(AssertProviderApplicationContextInvocationHandler.java:48) > > at > org.springframework.boot.test.context.assertj.ApplicationContextAssertProvider.get(ApplicationContextAssertProvider.java:112) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAssertableContext(AbstractApplicationContextRunner.java:412) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$null$0(AbstractApplicationContextRunner.java:382) > > at > org.springframework.boot.test.util.TestPropertyValues.applyToSystemProperties(TestPropertyValues.java:175) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$run$1(AbstractApplicationContextRunner.java:381) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.withContextClassLoader(AbstractApplicationContextRunner.java:392) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.run(AbstractApplicationContextRunner.java:381) > > at > org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfigurationTests.whenKafkaStreamsIsEnabledAndThereIsNoMeterRegistryThenListenerCustomizationBacksOff(KafkaMetricsAutoConfigurationTests.java:92) > {noformat} > The same code worked without changes using Kafka 2.6.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key
[ https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268911#comment-17268911 ] Matthias J. Sax commented on KAFKA-12213: - I am also happy to break into smaller junks... whatever we think works best or whatever [~MonCalamari] is interested in to work on. > Kafka Streams aggregation Initializer to accept record key > -- > > Key: KAFKA-12213 > URL: https://issues.apache.org/jira/browse/KAFKA-12213 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Piotr Fras >Assignee: Piotr Fras >Priority: Minor > Labels: needs-kip > > Sometimes Kafka record key contains useful information for creating a zero > object in aggregation Initializer. This feature is to add kafka record key to > Initializer. > There were two approaches I considered to implement this feature, one > respecting backwards compatibility for internal and external APIs and the > other one which is not. I chose the latter one as it was more strait-forward. > We may want to validate this approach tho. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
[ https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268908#comment-17268908 ] Charlie Johnston commented on KAFKA-9076: - Was this feature intended to only be one-directional or is this just a subset of the intended functionality? Using this with MM2 configuration and two active sites, I find that the site prefixed topics are not being included in any offset mirroring only the non-site prefixed (original) topics are being mirrored. Is it not common with MM2 use to have a scenario with consumers consuming from multiple topics (topic per site) to consume all sites? Apologies if there is any confusion or missing setting on my end, we dug through the code a bit and couldn't find any signs to suggest site prefixed topics are also mirrored. Thanks! > MirrorMaker 2.0 automated consumer offset sync > -- > > Key: KAFKA-9076 > URL: https://issues.apache.org/jira/browse/KAFKA-9076 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: mirrormaker, pull-request-available > Fix For: 2.7.0 > > > To calculate the translated consumer offset in the target cluster, currently > `Mirror-client` provides a function called "remoteConsumerOffsets()" that is > used by "RemoteClusterUtils" for one-time purpose. > In order to make the consumer and stream applications migrate from source to > target cluster transparently and conveniently, e.g. in event of source > cluster failure, a background job is proposed to periodically sync the > consumer offsets from the source to target cluster, so that when the consumer > and stream applications switch to the target cluster, it will resume to > consume from where it left off at source cluster. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0 > [https://github.com/apache/kafka/pull/7577] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10869) Gate topic IDs behind IBP 2.8
[ https://issues.apache.org/jira/browse/KAFKA-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-10869. Resolution: Fixed > Gate topic IDs behind IBP 2.8 > - > > Key: KAFKA-10869 > URL: https://issues.apache.org/jira/browse/KAFKA-10869 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 2.8.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > > We want to do this so we don't lose topic IDs upon downgrades. If we > downgrade and write to topic node in ZK, the topic ID will be lost. If the > broker is upgraded again later, the topic IDs may not match. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
rajinisivaram commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-763996884 @dengziming https://github.com/apache/kafka/pull/9814 has been merged, so this needs rebasing and the check for IBP. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram merged pull request #9814: KAFKA-10869: Gate topic IDs behind IBP 2.8
rajinisivaram merged pull request #9814: URL: https://github.com/apache/kafka/pull/9814 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9814: KAFKA-10869: Gate topic IDs behind IBP 2.8
rajinisivaram commented on pull request #9814: URL: https://github.com/apache/kafka/pull/9814#issuecomment-763993768 @jolshan Thanks for running the system tests, merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9835: MINOR: Add 'task container' class to KafkaStreams TaskManager
mjsax commented on a change in pull request #9835: URL: https://github.com/apache/kafka/pull/9835#discussion_r561342858 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +class Tasks { +private final Logger log; +private final InternalTopologyBuilder builder; +private final StreamsMetricsImpl streamsMetrics; + +private final Map allTasksPerId = new TreeMap<>(); +private final Map readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId); +private final Collection readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values()); + +// TODO: change type to `StreamTask` +private final Map activeTasksPerId = new TreeMap<>(); +// TODO: change type to `StreamTask` +private final Map activeTasksPerPartition = new HashMap<>(); +// TODO: change type to `StreamTask` +private final Map readOnlyActiveTasksPerId = Collections.unmodifiableMap(activeTasksPerId); +private final Set readOnlyActiveTaskIds = Collections.unmodifiableSet(activeTasksPerId.keySet()); +// TODO: change type to `StreamTask` +private final Collection readOnlyActiveTasks = Collections.unmodifiableCollection(activeTasksPerId.values()); + +// TODO: change type to `StandbyTask` +private final Map standbyTasksPerId = new TreeMap<>(); +// TODO: change type to `StandbyTask` +private final Map readOnlyStandbyTasksPerId = Collections.unmodifiableMap(standbyTasksPerId); +private final Set readOnlyStandbyTaskIds = Collections.unmodifiableSet(standbyTasksPerId.keySet()); + +private final ActiveTaskCreator activeTaskCreator; +private final StandbyTaskCreator standbyTaskCreator; + +private Consumer mainConsumer; + +Tasks(final String logPrefix, + final InternalTopologyBuilder builder, + final StreamsMetricsImpl streamsMetrics, + final ActiveTaskCreator activeTaskCreator, + final StandbyTaskCreator standbyTaskCreator) { + +final LogContext logContext = new LogContext(logPrefix); +log = logContext.logger(getClass()); + +this.builder = builder; +this.streamsMetrics = streamsMetrics; +this.activeTaskCreator = activeTaskCreator; +this.standbyTaskCreator = standbyTaskCreator; +} + +void setMainConsumer(final Consumer mainConsumer) { +this.mainConsumer = mainConsumer; +} + +void createTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate) { +for (final Map.Entry> taskToBeCreated : activeTasksToCreate.entrySet()) { +final TaskId taskId = taskToBeCreated.getKey(); + +if (activeTasksPerId.containsKey(taskId)) { +throw new IllegalStateException("Attempted to create an active task that we already own: " + taskId); +} +} + +for (final Map.Entry> taskToBeCreated : standbyTasksToCreate.entrySet()) { +final TaskId taskId = taskToBeCreated.getKey(); + +if (standbyTasksPerId.containsKey(taskId)) { +throw new IllegalStateException("Attempted to create a standby task that we already own: " + taskId); +} +} + +// keep this check to simplify testing (ie, no need to mock `activeTaskCreator`) +if (!activeTasksToCreate.isEmpty()) { +// TODO: change type to `StreamTask` +for (final Task activeTask :
[GitHub] [kafka] ableegoldman commented on a change in pull request #9835: MINOR: Add 'task container' class to KafkaStreams TaskManager
ableegoldman commented on a change in pull request #9835: URL: https://github.com/apache/kafka/pull/9835#discussion_r561340839 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +class Tasks { +private final Logger log; +private final InternalTopologyBuilder builder; +private final StreamsMetricsImpl streamsMetrics; + +private final Map allTasksPerId = new TreeMap<>(); +private final Map readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId); +private final Collection readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values()); + +// TODO: change type to `StreamTask` +private final Map activeTasksPerId = new TreeMap<>(); +// TODO: change type to `StreamTask` +private final Map activeTasksPerPartition = new HashMap<>(); +// TODO: change type to `StreamTask` +private final Map readOnlyActiveTasksPerId = Collections.unmodifiableMap(activeTasksPerId); +private final Set readOnlyActiveTaskIds = Collections.unmodifiableSet(activeTasksPerId.keySet()); +// TODO: change type to `StreamTask` +private final Collection readOnlyActiveTasks = Collections.unmodifiableCollection(activeTasksPerId.values()); + +// TODO: change type to `StandbyTask` +private final Map standbyTasksPerId = new TreeMap<>(); +// TODO: change type to `StandbyTask` +private final Map readOnlyStandbyTasksPerId = Collections.unmodifiableMap(standbyTasksPerId); +private final Set readOnlyStandbyTaskIds = Collections.unmodifiableSet(standbyTasksPerId.keySet()); + +private final ActiveTaskCreator activeTaskCreator; +private final StandbyTaskCreator standbyTaskCreator; + +private Consumer mainConsumer; + +Tasks(final String logPrefix, + final InternalTopologyBuilder builder, + final StreamsMetricsImpl streamsMetrics, + final ActiveTaskCreator activeTaskCreator, + final StandbyTaskCreator standbyTaskCreator) { + +final LogContext logContext = new LogContext(logPrefix); +log = logContext.logger(getClass()); + +this.builder = builder; +this.streamsMetrics = streamsMetrics; +this.activeTaskCreator = activeTaskCreator; +this.standbyTaskCreator = standbyTaskCreator; +} + +void setMainConsumer(final Consumer mainConsumer) { +this.mainConsumer = mainConsumer; +} + +void createTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate) { +for (final Map.Entry> taskToBeCreated : activeTasksToCreate.entrySet()) { +final TaskId taskId = taskToBeCreated.getKey(); + +if (activeTasksPerId.containsKey(taskId)) { +throw new IllegalStateException("Attempted to create an active task that we already own: " + taskId); +} +} + +for (final Map.Entry> taskToBeCreated : standbyTasksToCreate.entrySet()) { +final TaskId taskId = taskToBeCreated.getKey(); + +if (standbyTasksPerId.containsKey(taskId)) { +throw new IllegalStateException("Attempted to create a standby task that we already own: " + taskId); +} +} + +// keep this check to simplify testing (ie, no need to mock `activeTaskCreator`) +if (!activeTasksToCreate.isEmpty()) { +// TODO: change type to `StreamTask` +for (final Task activeTask :
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
aloknnikhil commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561334437 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java ## @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.raft; - -import org.apache.kafka.common.config.ConfigException; -import org.junit.jupiter.api.Test; - -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashMap; -import java.util.Properties; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -public class RaftConfigTest { - -@Test -public void testSingleQuorumVoterConnections() { -Properties properties = new Properties(); -properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@127.0.0.1:9092"); -RaftConfig config = new RaftConfig(properties); -assertEquals(Collections.singletonMap(1, new InetSocketAddress("127.0.0.1", 9092)), -config.quorumVoterConnections()); -} - -@Test -public void testMultiQuorumVoterConnections() { -Properties properties = new Properties(); -properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@kafka1:9092,2@kafka2:9092,3@kafka3:9092"); -RaftConfig config = new RaftConfig(properties); - -HashMap expected = new HashMap<>(); -expected.put(1, new InetSocketAddress("kafka1", 9092)); -expected.put(2, new InetSocketAddress("kafka2", 9092)); -expected.put(3, new InetSocketAddress("kafka3", 9092)); - -assertEquals(expected, config.quorumVoterConnections()); Review comment: Ah, good catch. Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
aloknnikhil commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561327703 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ## @@ -35,24 +35,10 @@ public static RaftConfig buildRaftConfig( int appendLingerMs, List voterNodes ) { -Properties properties = new Properties(); -properties.put(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); -properties.put(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, retryBackoffMs); -properties.put(RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, electionTimeoutMs); -properties.put(RaftConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, electionBackoffMs); -properties.put(RaftConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, fetchTimeoutMs); -properties.put(RaftConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs); - -StringBuilder votersString = new StringBuilder(); -String prefix = ""; -for (Node voter : voterNodes) { -votersString.append(prefix); - votersString.append(voter.id()).append('@').append(voter.host()).append(':').append(voter.port()); -prefix = ","; -} -properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, votersString.toString()); - -return new RaftConfig(properties); +Map voterConnections = voterNodes.stream() +.collect(Collectors.toMap(Node::id, node -> new InetSocketAddress(node.host(), node.port(; +return new RaftConfig(voterConnections, requestTimeoutMs, retryBackoffMs, electionTimeoutMs, electionBackoffMs, Review comment: You're right. This is an artifact from the previous constructor usage. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
aloknnikhil commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561322418 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1258,6 +1267,15 @@ object KafkaConfig { .define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc) .define(PasswordEncoderKeyLengthProp, INT, Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc) .define(PasswordEncoderIterationsProp, INT, Defaults.PasswordEncoderIterations, atLeast(1024), LOW, PasswordEncoderIterationsDoc) + + /** * Raft Quorum Configuration */ + .defineInternal(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, Defaults.QuorumVoters, new RaftConfig.ControllerQuorumVotersValidator(), HIGH) Review comment: Ack. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
aloknnikhil commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561315012 ## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ## @@ -941,4 +950,38 @@ class KafkaConfigTest { }) } + @Test + def testInvalidQuorumVotersConfig(): Unit = { +assertInvalidQuorumVoters("1") +assertInvalidQuorumVoters("1@") +assertInvalidQuorumVoters("1:") +assertInvalidQuorumVoters("blah@") +assertInvalidQuorumVoters("1@kafka1") +assertInvalidQuorumVoters("1@kafka1:9092,") +assertInvalidQuorumVoters("1@kafka1:9092,") +assertInvalidQuorumVoters("1@kafka1:9092,2") +assertInvalidQuorumVoters("1@kafka1:9092,2@") +assertInvalidQuorumVoters("1@kafka1:9092,2@blah") +assertInvalidQuorumVoters("1@kafka1:9092,2@blah,") + } + + private def assertInvalidQuorumVoters(value: String): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value) +assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + } + + @Test + def testValidQuorumVotersConfig(): Unit = { +assertValidQuorumVoters("", 0) +assertValidQuorumVoters("1@127.0.0.1:9092", 1) +assertValidQuorumVoters("1@kafka1:9092,2@kafka2:9092,3@kafka3:9092", 3) + } + + private def assertValidQuorumVoters(value: String, expectedVoterCount: Int): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value) +assertDoesNotThrow(() => KafkaConfig.fromProps(props)) Review comment: Makes sense. Removed. ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -236,4 +176,31 @@ private static Integer parseVoterId(String idString) { return voterMap; } +public static class ControllerQuorumVotersValidator implements ConfigDef.Validator { +@Override +public void ensureValid(String name, Object value) { +if (value == null) { +throw new ConfigException(name, null); +} + +@SuppressWarnings("unchecked") +List voterStrings = (List) value; + +if (voterStrings.size() == 0) { +// TODO: Add a flag to skip validation for an empty voter string, conditionally. +// For now, skip anyway. See https://github.com/apache/kafka/pull/9916#discussion_r560611932 Review comment: Fair enough. Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on pull request #6592: URL: https://github.com/apache/kafka/pull/6592#issuecomment-763952940 @mjsax Hey, just rebased my branch with the trunk, updated my tests to use JUnit 5. Let me know when you guys will have time to review it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9934: MINOR: Drop enable.metadata.quorum config
hachikuji commented on pull request #9934: URL: https://github.com/apache/kafka/pull/9934#issuecomment-763923618 I decided to leave the controller-only flag as it is. I think there are further improvements here to make the scope of the API clearer, but the implications for compatibility are subtle enough that we should consider it separately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
hachikuji commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561283768 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java ## @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.raft; - -import org.apache.kafka.common.config.ConfigException; -import org.junit.jupiter.api.Test; - -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashMap; -import java.util.Properties; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -public class RaftConfigTest { - -@Test -public void testSingleQuorumVoterConnections() { -Properties properties = new Properties(); -properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@127.0.0.1:9092"); -RaftConfig config = new RaftConfig(properties); -assertEquals(Collections.singletonMap(1, new InetSocketAddress("127.0.0.1", 9092)), -config.quorumVoterConnections()); -} - -@Test -public void testMultiQuorumVoterConnections() { -Properties properties = new Properties(); -properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@kafka1:9092,2@kafka2:9092,3@kafka3:9092"); -RaftConfig config = new RaftConfig(properties); - -HashMap expected = new HashMap<>(); -expected.put(1, new InetSocketAddress("kafka1", 9092)); -expected.put(2, new InetSocketAddress("kafka2", 9092)); -expected.put(3, new InetSocketAddress("kafka3", 9092)); - -assertEquals(expected, config.quorumVoterConnections()); Review comment: This test case seems stronger than the one that was ported to `KafkaConfigTest`. It is validating the endpoints in addition to the number of voters. Is there any way we can recover this? ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -236,4 +176,31 @@ private static Integer parseVoterId(String idString) { return voterMap; } +public static class ControllerQuorumVotersValidator implements ConfigDef.Validator { +@Override +public void ensureValid(String name, Object value) { +if (value == null) { +throw new ConfigException(name, null); +} + +@SuppressWarnings("unchecked") +List voterStrings = (List) value; + +if (voterStrings.size() == 0) { +// TODO: Add a flag to skip validation for an empty voter string, conditionally. +// For now, skip anyway. See https://github.com/apache/kafka/pull/9916#discussion_r560611932 Review comment: We typically do not leave TODOs in the code. We can file a jira if we think it's important to remember. I'd suggest we just leave this check out and skip the empty check below. ## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ## @@ -941,4 +950,38 @@ class KafkaConfigTest { }) } + @Test + def testInvalidQuorumVotersConfig(): Unit = { +assertInvalidQuorumVoters("1") +assertInvalidQuorumVoters("1@") +assertInvalidQuorumVoters("1:") +assertInvalidQuorumVoters("blah@") +assertInvalidQuorumVoters("1@kafka1") +assertInvalidQuorumVoters("1@kafka1:9092,") +assertInvalidQuorumVoters("1@kafka1:9092,") +assertInvalidQuorumVoters("1@kafka1:9092,2") +assertInvalidQuorumVoters("1@kafka1:9092,2@") +assertInvalidQuorumVoters("1@kafka1:9092,2@blah") +assertInvalidQuorumVoters("1@kafka1:9092,2@blah,") + } + + private def assertInvalidQuorumVoters(value: String): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value) +assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + } + + @Test + def testValidQuorumVotersConfig(): Unit = { +assertValidQuorumVoters("", 0) +assertValidQuorumVoters("1@127.0.0.1:9092", 1) +assertValidQuorumVoters("1@kafka1:9092,2@kafka2:9092,3@kafka3:9092", 3) + } + + private def assertValidQuorumVoters(value: String,
[GitHub] [kafka] ijuma commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5
ijuma commented on pull request #9926: URL: https://github.com/apache/kafka/pull/9926#issuecomment-763893680 @chia7712 What have you seen that implies that? This was certainly the case before, not sure if something changed at some point (I know because I've seen compiler errors that would not be possible without the merge). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class
mjsax opened a new pull request #9940: URL: https://github.com/apache/kafka/pull/9940 Call for review @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9904: KAFKA-12211: don't change perm for base/state dir when no persistent store
ableegoldman commented on pull request #9904: URL: https://github.com/apache/kafka/pull/9904#issuecomment-763886188 Also cherrypicked to 2.7 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9904: KAFKA-12211: don't change perm for base/state dir when no persistent store
ableegoldman commented on pull request #9904: URL: https://github.com/apache/kafka/pull/9904#issuecomment-763884833 Thanks @showuon , merged to trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #9904: KAFKA-12211: don't change perm for base/state dir when no persistent store
ableegoldman merged pull request #9904: URL: https://github.com/apache/kafka/pull/9904 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12185) Flaky Test StoreQueryIntegrationTest#shouldQueryAllStalePartitionStores
[ https://issues.apache.org/jira/browse/KAFKA-12185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-12185: --- Assignee: Matthias J. Sax > Flaky Test StoreQueryIntegrationTest#shouldQueryAllStalePartitionStores > --- > > Key: KAFKA-12185 > URL: https://issues.apache.org/jira/browse/KAFKA-12185 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > java.lang.AssertionError: Application did not reach a RUNNING state for > all streams instances. Non-running instances: > \{org.apache.kafka.streams.KafkaStreams@651720d3=NOT_RUNNING} > at org.junit.Assert.fail(Assert.java:89) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning(IntegrationTestUtils.java:892) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores(StoreQueryIntegrationTest.java:270) > > {{https://github.com/apache/kafka/pull/9835/checks?check_run_id=139314}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5
chia7712 commented on pull request #9926: URL: https://github.com/apache/kafka/pull/9926#issuecomment-763874639 > be aware that Jenkins merges trunk before running the PR tests @ijuma it seems that jenkins does not merge trunk before running QA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5
chia7712 commented on pull request #9926: URL: https://github.com/apache/kafka/pull/9926#issuecomment-763872582 ``` org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect ``` fixed flaky This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12185) Flaky Test StoreQueryIntegrationTest#shouldQueryAllStalePartitionStores
[ https://issues.apache.org/jira/browse/KAFKA-12185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268805#comment-17268805 ] John Roesler commented on KAFKA-12185: -- I just saw this fail locally and happened to notice this in the logs: {code:java} [2021-01-20 13:03:55,375] ERROR stream-client [app-StoreQueryIntegrationTestshouldQueryAllStalePartitionStores-ba1cebd9-bccc-4d54-af8a-2f6a51200612] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. (org.apache.kafka.streams.KafkaStreams:469) org.apache.kafka.common.KafkaException: User rebalance callback throws an error at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:451) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:367) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:882) at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:839) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:679) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:547) Caused by: java.lang.RuntimeException: Unexpected failure to close 1 task(s) [[0_1]]. First unexpected exception (for task 0_1) follows. at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:311) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1484) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421) ... 11 more Caused by: java.util.ConcurrentModificationException at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1704) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.kafka.streams.processor.internals.Tasks.convertActiveToStandby(Tasks.java:133) at org.apache.kafka.streams.processor.internals.TaskManager.handleCloseAndRecycle(TaskManager.java:394) at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:291) ... 14 more {code} > Flaky Test StoreQueryIntegrationTest#shouldQueryAllStalePartitionStores > --- > > Key: KAFKA-12185 > URL: https://issues.apache.org/jira/browse/KAFKA-12185 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > java.lang.AssertionError: Application did not reach a RUNNING state for > all streams instances. Non-running instances: > \{org.apache.kafka.streams.KafkaStreams@651720d3=NOT_RUNNING} > at org.junit.Assert.fail(Assert.java:89) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning(IntegrationTestUtils.java:892) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores(StoreQueryIntegrationTest.java:270) > > {{https://github.com/apache/kafka/pull/9835/checks?check_run_id=139314}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration
[ https://issues.apache.org/jira/browse/KAFKA-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Kashavkin updated KAFKA-12228: - Description: I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting up the broker with DigiCert SSL certificate. I used new options and I did everything like in example in [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key]. But I get the error: {code:bash} [2021-01-20 17:54:55,787] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) org.apache.kafka.common.config.ConfigException: Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) at kafka.network.Processor.(SocketServer.scala:790) at kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288) at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at scala.collection.AbstractIterable.foreach(Iterable.scala:920) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251) at kafka.network.SocketServer.startup(SocketServer.scala:125) at kafka.server.KafkaServer.startup(KafkaServer.scala:303) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) {code} Java is used: {code:bash} openjdk version "1.8.0_272" OpenJDK Runtime Environment (build 1.8.0_272-b10) OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode) {code} OS is Centos 7.8.2003 _openssl x509 -in certificate.pem -text :_ {code:java} Certificate: ... Signature Algorithm: ecdsa-with-SHA384 ... Subject Public Key Info: Public Key Algorithm: id-ecPublicKey Public-Key: (256 bit) {code} was: I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting up the broker with DigiCert SSL certificate. I used new options and I did everything like in example in [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key]. But I get the error: {code:bash} [2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)[2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)org.apache.kafka.common.config.ConfigException: Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) at kafka.network.Processor.(SocketServer.scala:790) at kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288) at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at scala.collection.AbstractIterable.foreach(Iterable.scala:920) at
[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper
hachikuji commented on a change in pull request #9912: URL: https://github.com/apache/kafka/pull/9912#discussion_r561189636 ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -227,37 +225,33 @@ class KafkaApisTest { authorizeResource(authorizer, operation, ResourceType.TOPIC, resourceName, AuthorizationResult.ALLOWED) -val capturedResponse = expectNoThrottling() - val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName) EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false))) .andAnswer(() => { Map(configResource -> alterConfigHandler.apply()) }) -EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer, - adminManager, controller) - val configs = Map( configResource -> new AlterConfigsRequest.Config( Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava)) val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, false).build(requestHeader.apiVersion) val request = buildRequestWithEnvelope(alterConfigsRequest, fromPrivilegedListener = true) +val capturedResponse = EasyMock.newCapture[AbstractResponse]() +val capturedRequest = EasyMock.newCapture[RequestChannel.Request]() -createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request) - -val envelopeRequest = request.body[EnvelopeRequest] -val response = readResponse(envelopeRequest, capturedResponse) - .asInstanceOf[EnvelopeResponse] - -assertEquals(Errors.NONE, response.error) Review comment: There remains some awkwardness in the handling of envelope requests. Basically the flow is like this: 1. KafkaApis.handle(envelope(alterConfigRequest)) 2. KafkaApis.handle(alterConfigRequest) 3. RequestChannel.sendResponse(alterConfigResponse) 4. Request.buildResponseSend() -> envelope(alterConfigResponse) Previously `KafkaApisTest` had to work by parsing the response send, so we had to unwrap the envelope. But now we get the direct call to `sendResponse` with the embedded `AbstractResponse` instance. So basically we do not hit step 4 anymore. I think a nicer way to structure this which we can consider separately is to change `KafkaApis.handle` so that it takes a callback rather than assuming that responses are sent directly to the request channel. Then when we make the recursive call to `handle` after unwrapping the envelope, we can provide a callback which wraps the underlying response with the response envelope. Alternatively, we can have `KafkaApis.handle` return a `CompletableFuture`. The main point is that we allow for some custom behavior when the response is ready. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9895: KAFKA-9924: Add docs for RocksDB properties-based metrics
ijuma commented on pull request #9895: URL: https://github.com/apache/kafka/pull/9895#issuecomment-763849858 Jenkins is the best place to look: ![Screenshot from 2021-01-20 10-36-13](https://user-images.githubusercontent.com/24747/105219511-63ad4600-5b0b-11eb-8f06-b2832646025d.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9935: HOTFIX: fix RocksDBMetricsTest
ableegoldman commented on pull request #9935: URL: https://github.com/apache/kafka/pull/9935#issuecomment-763842306 Thanks for the fix @chia7712 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9895: KAFKA-9924: Add docs for RocksDB properties-based metrics
ableegoldman commented on pull request #9895: URL: https://github.com/apache/kafka/pull/9895#issuecomment-763841228 @ijuma I'm confused, I don't see any failures for the test that this broke (RocksDBMetricsTest) in the build on the last commit. I saw some test failures, but all unrelated. Am I looking in the wrong place? (I followed the link on the "Apply suggestions from code review" commit build --> https://github.com/apache/kafka/runs/1722608276 ) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 opened a new pull request #9939: MINOR: fix @link tag in javadoc
tang7526 opened a new pull request #9939: URL: https://github.com/apache/kafka/pull/9939 fix @link tag in javadoc ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper
hachikuji commented on a change in pull request #9912: URL: https://github.com/apache/kafka/pull/9912#discussion_r561175037 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -371,9 +372,44 @@ class RequestChannel(val queueSize: Int, requestQueue.put(request) } - /** Send a response back to the socket server to be sent over the network */ - def sendResponse(response: RequestChannel.Response): Unit = { + def closeConnection( +request: RequestChannel.Request, +errorCounts: java.util.Map[Errors, Integer] + ): Unit = { +// This case is used when the request handler has encountered an error, but the client +// does not expect a response (e.g. when produce request has acks set to 0) +updateErrorMetrics(request.header.apiKey, errorCounts.asScala) +sendResponse(new RequestChannel.CloseConnectionResponse(request)) + } + + def sendResponse( +request: RequestChannel.Request, +response: AbstractResponse, +onComplete: Option[Send => Unit] Review comment: That's fair. I will check the scope of this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration
[ https://issues.apache.org/jira/browse/KAFKA-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Kashavkin updated KAFKA-12228: - Description: I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting up the broker with DigiCert SSL certificate. I used new options and I did everything like in example in [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key]. But I get the error: {code:bash} [2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)[2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)org.apache.kafka.common.config.ConfigException: Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) at kafka.network.Processor.(SocketServer.scala:790) at kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288) at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at scala.collection.AbstractIterable.foreach(Iterable.scala:920) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251) at kafka.network.SocketServer.startup(SocketServer.scala:125) at kafka.server.KafkaServer.startup(KafkaServer.scala:303) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) {code} Java is used: {code:bash} openjdk version "1.8.0_272" OpenJDK Runtime Environment (build 1.8.0_272-b10) OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode) {code} OS is Centos 7.8.2003 _openssl x509 -in certificate.pem -text :_ {code:java} Certificate: ... Signature Algorithm: ecdsa-with-SHA384 ... Subject Public Key Info: Public Key Algorithm: id-ecPublicKey Public-Key: (256 bit) {code} was: I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting up the broker with DigiCert SSL certificate. I used new options and I did everything like in example in [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key]. But I get the error: {code:bash} [2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)[2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)org.apache.kafka.common.config.ConfigException: Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) at kafka.network.Processor.(SocketServer.scala:790) at kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288) at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at scala.collection.AbstractIterable.foreach(Iterable.scala:920) at
[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper
hachikuji commented on a change in pull request #9912: URL: https://github.com/apache/kafka/pull/9912#discussion_r561174514 ## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java ## @@ -175,4 +175,19 @@ public String clientId() { public int correlationId() { return header.correlationId(); } + +@Override +public String toString() { +return "RequestContext(" + +"header=" + header + +", connectionId='" + connectionId + '\'' + +", clientAddress=" + clientAddress + +", principal=" + principal + +", listenerName=" + listenerName + +", securityProtocol=" + securityProtocol + +", clientInformation=" + clientInformation + +", fromPrivilegedListener=" + fromPrivilegedListener + +", principalSerde=" + principalSerde + Review comment: Yeah, I considered it, but didn't see anything in here that looked sensitive. Most of this information is already included in the request logging. The client address is not, but that seems ok. Including the principal serde is annoying, but doesn't seem like a problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration
Alexey Kashavkin created KAFKA-12228: Summary: Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration Key: KAFKA-12228 URL: https://issues.apache.org/jira/browse/KAFKA-12228 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.7.0 Reporter: Alexey Kashavkin Attachments: kafka.log I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting up the broker with DigiCert SSL certificate. I used new options and I did everything like in example in [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key]. But I get the error: {code:bash} [2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)[2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)org.apache.kafka.common.config.ConfigException: Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) at kafka.network.Processor.(SocketServer.scala:790) at kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288) at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at scala.collection.AbstractIterable.foreach(Iterable.scala:920) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251) at kafka.network.SocketServer.startup(SocketServer.scala:125) at kafka.server.KafkaServer.startup(KafkaServer.scala:303) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) {code} Java is used: {code:bash} openjdk version "1.8.0_272" OpenJDK Runtime Environment (build 1.8.0_272-b10) OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode) {code} OS is Centos 7.8.2003 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #9934: MINOR: Drop enable.metadata.quorum config
hachikuji commented on pull request #9934: URL: https://github.com/apache/kafka/pull/9934#issuecomment-763832322 @cmccabe Yeah, that's right. Let me see if I can improve this a little bit to make the intent clearer. We have some APIs which will be exposed by both the broker and the controller, so I think this should probably be a set rather than a flag. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9934: MINOR: Drop enable.metadata.quorum config
cmccabe commented on pull request #9934: URL: https://github.com/apache/kafka/pull/9934#issuecomment-763827474 > @hachikuji Do we still need this PR since Colin already did one? That PR was for the KIP-500 branch, not for trunk. It also didn't refactor the "disabled" APIs concept, which I think is useful. > Replace the notion of "disabled" APIs with "controller-only" APIs. We previously marked some APIs which were intended only for the KIP-500 as "disabled" so that they would not be unintentionally exposed That seems reasonable. I guess the idea is that the broker will not include those APIs in its ApiVersionsResponse. However, the controller always will (not that clients will talk directly to the controller anyway...) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest
chia7712 commented on a change in pull request #9938: URL: https://github.com/apache/kafka/pull/9938#discussion_r561156309 ## File path: core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala ## @@ -32,17 +32,55 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import scala.jdk.CollectionConverters._ +object DescribeAuthorizedOperationsTest { + val Group1 = "group1" Review comment: Is ```GROUP_1" more better? it is global variable in this test. ## File path: core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala ## @@ -86,54 +122,36 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS closeSasl() } - val group1Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group1, PatternType.LITERAL), -new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, AclPermissionType.ALLOW)) - - val group2Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group2, PatternType.LITERAL), -new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) - - val group3Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group3, PatternType.LITERAL), -new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE, AclPermissionType.ALLOW)) - - val clusterAllAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL), -new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, AclPermissionType.ALLOW)) - - val topic1Acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic1, PatternType.LITERAL), -new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, AclPermissionType.ALLOW)) - - val topic2All = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic2, PatternType.LITERAL), -new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE, AclPermissionType.ALLOW)) - - def createConfig(): Properties = { + private def createConfig(): Properties = { val adminClientConfig = new Properties() adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "2") val securityProps: util.Map[Object, Object] = TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) -securityProps.forEach { (key, value) => adminClientConfig.put(key.asInstanceOf[String], value) } +securityProps.forEach((key, value) => adminClientConfig.put(key.asInstanceOf[String], value)) Review comment: How about ```adminClientConfig.putAll(securityProps)```? ## File path: core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala ## @@ -32,17 +32,55 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import scala.jdk.CollectionConverters._ +object DescribeAuthorizedOperationsTest { + val Group1 = "group1" + val Group2 = "group2" + val Group3 = "group3" + val Topic1 = "topic1" + val Topic2 = "topic2" + + val Group1Acl = new AclBinding( +new ResourcePattern(ResourceType.GROUP, Group1, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL, ALLOW)) + + val Group2Acl = new AclBinding( +new ResourcePattern(ResourceType.GROUP, Group2, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DESCRIBE, ALLOW)) + + val Group3Acl = new AclBinding( +new ResourcePattern(ResourceType.GROUP, Group3, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DELETE, ALLOW)) + + val ClusterAllAcl = new AclBinding( +new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL, ALLOW)) + + val Topic1Acl = new AclBinding( +new ResourcePattern(ResourceType.TOPIC, Topic1, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL, ALLOW)) + + val Topic2All = new AclBinding( +new ResourcePattern(ResourceType.TOPIC, Topic2, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DELETE, ALLOW)) + + private def accessControlEntry( +userName: String, +operation: AclOperation, +permissionType: AclPermissionType + ): AccessControlEntry = { +new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, userName).toString, + AclEntry.WildcardHost, operation, permissionType) + } +} + class
[GitHub] [kafka] hachikuji commented on a change in pull request #9934: MINOR: Drop enable.metadata.quorum config
hachikuji commented on a change in pull request #9934: URL: https://github.com/apache/kafka/pull/9934#discussion_r561157384 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -638,6 +637,9 @@ object KafkaConfig { val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC val ConnectionSetupTimeoutMsDoc = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC val ConnectionSetupTimeoutMaxMsDoc = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC + val ProcessRolesDoc = "[ALPHA] The roles that this process plays: 'broker', 'controller', or 'broker,controller' if it is both. " + Review comment: I think I'm going to keep this as internal for now. Let's reconsider this after we have something that can actually start up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12227) Add method "Producer#produce" to return CompletionStage instead of Future
[ https://issues.apache.org/jira/browse/KAFKA-12227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-12227: --- Summary: Add method "Producer#produce" to return CompletionStage instead of Future (was: add new method "Producer#produce" which can return CompletionStage instead of Future) > Add method "Producer#produce" to return CompletionStage instead of Future > -- > > Key: KAFKA-12227 > URL: https://issues.apache.org/jira/browse/KAFKA-12227 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Priority: Major > > Producer and KafkaProducer return a java.util.concurrent.Future from their > send methods. This makes it challenging to write asynchronous non-blocking > code given Future's limited interface. Since Kafka now requires Java 8, we > now have the option of using CompletionStage and/or CompletableFuture that > were introduced to solve this issue. It's worth noting that the Kafka > AdminClient solved this issue by using org.apache.kafka.common.KafkaFuture as > Java 7 support was still required then. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-XXX%3A+Return+CompletableFuture+from+KafkaProducer.send -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12227) add new method "Producer#produce" which can return CompletionStage instead of Future
[ https://issues.apache.org/jira/browse/KAFKA-12227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-12227: --- Summary: add new method "Producer#produce" which can return CompletionStage instead of Future (was: Return CompletableFuture from KafkaProducer.send) > add new method "Producer#produce" which can return CompletionStage instead of > Future > > > Key: KAFKA-12227 > URL: https://issues.apache.org/jira/browse/KAFKA-12227 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Priority: Major > > Producer and KafkaProducer return a java.util.concurrent.Future from their > send methods. This makes it challenging to write asynchronous non-blocking > code given Future's limited interface. Since Kafka now requires Java 8, we > now have the option of using CompletionStage and/or CompletableFuture that > were introduced to solve this issue. It's worth noting that the Kafka > AdminClient solved this issue by using org.apache.kafka.common.KafkaFuture as > Java 7 support was still required then. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-XXX%3A+Return+CompletableFuture+from+KafkaProducer.send -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac opened a new pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest
dajac opened a new pull request #9938: URL: https://github.com/apache/kafka/pull/9938 I was looking at `DescribeAuthorizedOperationsTest` in the context of KIP-700. I have made few changes to improve the readability of the code/tests. The PR only moves code around. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12210) AdminClient should use DescribeCluster API when available
[ https://issues.apache.org/jira/browse/KAFKA-12210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-12210. - Fix Version/s: 2.8.0 Resolution: Fixed > AdminClient should use DescribeCluster API when available > - > > Key: KAFKA-12210 > URL: https://issues.apache.org/jira/browse/KAFKA-12210 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 2.8.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)