[jira] [Updated] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration

2021-01-20 Thread Alexey Kashavkin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kashavkin updated KAFKA-12228:
-
Description: 
I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting 
up the broker with DigiCert SSL certificate. I used new options and I did 
everything like in example in 
[KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key].
 But I get the error:
{code:bash}
[2021-01-20 17:54:55,787] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.config.ConfigException: Invalid value 
javax.net.ssl.SSLHandshakeException: no cipher suites in common for 
configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings.
at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98)
at 
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
at kafka.network.Processor.(SocketServer.scala:790)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:415)
at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288)
at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251)
at kafka.network.SocketServer.startup(SocketServer.scala:125)
at kafka.server.KafkaServer.startup(KafkaServer.scala:303)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
{code}
Java is used:
{code:bash}
openjdk version "1.8.0_272"
OpenJDK Runtime Environment (build 1.8.0_272-b10)
OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode)
{code}
OS is Centos 7.8.2003

_openssl x509 -in certificate.pem -text :_
{code:java}
Certificate:
...
Signature Algorithm: ecdsa-with-SHA384
...
Subject Public Key Info:
Public Key Algorithm: id-ecPublicKey
Public-Key: (256 bit)
{code}
Log is attached.

  was:
I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting 
up the broker with DigiCert SSL certificate. I used new options and I did 
everything like in example in 
[KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key].
 But I get the error:
{code:bash}
[2021-01-20 17:54:55,787] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.config.ConfigException: Invalid value 
javax.net.ssl.SSLHandshakeException: no cipher suites in common for 
configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings.
at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98)
at 
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
at kafka.network.Processor.(SocketServer.scala:790)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:415)
at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288)
at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
at 

[jira] [Updated] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration

2021-01-20 Thread Alexey Kashavkin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kashavkin updated KAFKA-12228:
-
Attachment: (was: kafka.log)

> Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common 
> for configuration
> ---
>
> Key: KAFKA-12228
> URL: https://issues.apache.org/jira/browse/KAFKA-12228
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: Alexey Kashavkin
>Priority: Major
> Attachments: kafka.log
>
>
> I found that Kafka 2.7.0 supports PEM certificates and I decided to try 
> setting up the broker with DigiCert SSL certificate. I used new options and I 
> did everything like in example in 
> [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key].
>  But I get the error:
> {code:bash}
> [2021-01-20 17:54:55,787] ERROR [KafkaServer id=0] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.config.ConfigException: Invalid value 
> javax.net.ssl.SSLHandshakeException: no cipher suites in common for 
> configuration A client SSLEngine created with the provided settings can't 
> connect to a server SSLEngine created with those settings.
> at 
> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98)
> at 
> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
> at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
> at 
> org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
> at kafka.network.Processor.(SocketServer.scala:790)
> at kafka.network.SocketServer.newProcessor(SocketServer.scala:415)
> at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288)
> at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287)
> at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254)
> at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
> at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251)
> at kafka.network.SocketServer.startup(SocketServer.scala:125)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:303)
> at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
> at kafka.Kafka$.main(Kafka.scala:82)
> at kafka.Kafka.main(Kafka.scala)
> {code}
> Java is used:
> {code:bash}
> openjdk version "1.8.0_272"
> OpenJDK Runtime Environment (build 1.8.0_272-b10)
> OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode)
> {code}
> OS is Centos 7.8.2003
> _openssl x509 -in certificate.pem -text :_
> {code:java}
> Certificate:
> ...
> Signature Algorithm: ecdsa-with-SHA384
> ...
> Subject Public Key Info:
> Public Key Algorithm: id-ecPublicKey
> Public-Key: (256 bit)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration

2021-01-20 Thread Alexey Kashavkin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kashavkin updated KAFKA-12228:
-
Attachment: kafka.log

> Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common 
> for configuration
> ---
>
> Key: KAFKA-12228
> URL: https://issues.apache.org/jira/browse/KAFKA-12228
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: Alexey Kashavkin
>Priority: Major
> Attachments: kafka.log
>
>
> I found that Kafka 2.7.0 supports PEM certificates and I decided to try 
> setting up the broker with DigiCert SSL certificate. I used new options and I 
> did everything like in example in 
> [KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key].
>  But I get the error:
> {code:bash}
> [2021-01-20 17:54:55,787] ERROR [KafkaServer id=0] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.config.ConfigException: Invalid value 
> javax.net.ssl.SSLHandshakeException: no cipher suites in common for 
> configuration A client SSLEngine created with the provided settings can't 
> connect to a server SSLEngine created with those settings.
> at 
> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98)
> at 
> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
> at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
> at 
> org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
> at kafka.network.Processor.(SocketServer.scala:790)
> at kafka.network.SocketServer.newProcessor(SocketServer.scala:415)
> at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288)
> at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287)
> at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254)
> at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
> at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251)
> at kafka.network.SocketServer.startup(SocketServer.scala:125)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:303)
> at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
> at kafka.Kafka$.main(Kafka.scala:82)
> at kafka.Kafka.main(Kafka.scala)
> {code}
> Java is used:
> {code:bash}
> openjdk version "1.8.0_272"
> OpenJDK Runtime Environment (build 1.8.0_272-b10)
> OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode)
> {code}
> OS is Centos 7.8.2003
> _openssl x509 -in certificate.pem -text :_
> {code:java}
> Certificate:
> ...
> Signature Algorithm: ecdsa-with-SHA384
> ...
> Subject Public Key Info:
> Public Key Algorithm: id-ecPublicKey
> Public-Key: (256 bit)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.

2021-01-20 Thread kaushik srinivas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269087#comment-17269087
 ] 

kaushik srinivas commented on KAFKA-12164:
--

Adding few major concerns with regard to the feedback of re creating the 
corrupt directories upon restart,

function syncWithHive() (DataWriter.java) is called at every restart/boot up of 
the connector. And this is the function which does an initial audit of all the 
partition directories and tries to sync the hdfs folders with the hive 
partitions before proceeding further to consume records from kafka.

Below is the snippet for the same.

 
{code:java}
List partitions = hiveMetaStore.listPartitions(hiveDatabase, 
topicTableMap.get(topic), (short) -1);
FileStatus[] statuses = FileUtils.getDirectories(storage, new Path(topicDir));
for (FileStatus status : statuses) {
  String location = status.getPath().toString();
  if (!partitions.contains(location)) {
String partitionValue = getPartitionValue(location);
hiveMetaStore.addPartition(hiveDatabase, topicTableMap.get(topic), 
partitionValue);
  }
{code}
Now going one step inside into function getDirectories > getDirectoriesImpl 
(from FileUtils).
here, those paths are returned as partition path if
a. the path is a directory
b. path does not contain nested directories (by way of checking no of non 
directory files is equal to no of (directory + non directory) files in the path.

If above conditions are met, then the path is added as partition path.

So in the erroneous case where the actual path is supposed to look like
/test1=0/test2=0/xxx.parquet
But instead due to a crash looks like below,
/test1=0/

In this case /test1=0 , satisfies the above a conditions and hence is 
returned as a new partition path to be updated to hive.
Doing this update to hive fails because the actual partition for hive is 
expected to be /test1=0/test2=0 and not /test1=0/

 

So this would mean, once there is a corrupt partition directory in hdfs, at 
every restart of the connector syncWithHive() call will keep throwing hive 
exceptions till the directory is corrected in the hdfs. This means that the 
stage of consuming the old (failed to commit) records again (even assuming its 
present in kafka after restart) would never be reached and connector remains in 
crashed state forever and requires a manual intervention of clean up activity.

-kaushik

 

> ssue when kafka connect worker pod restart, during creation of nested 
> partition directories in hdfs file system.
> 
>
> Key: KAFKA-12164
> URL: https://issues.apache.org/jira/browse/KAFKA-12164
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: kaushik srinivas
>Priority: Critical
>
> In our production labs, an issue is observed. Below is the sequence of the 
> same.
>  # hdfs connector is added to the connect worker.
>  # hdfs connector is creating folders in hdfs /test1=1/test2=2/
> Based on the custom partitioner. Here test1 and test2 are two separate nested 
> directories derived from multiple fields in the record using a custom 
> partitioner.
>  # Now kafka connect hdfs connector uses below function calls to create the 
> directories in the hdfs file system.
> fs.mkdirs(new Path(filename));
> ref: 
> [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java]
> Now the important thing to note is that if mkdirs() is a non atomic operation 
> (i.e can result in partial execution if interrupted)
> then suppose the first directory ie test1 is created and just before creation 
> of test2 in hdfs happens if there is a restart to the connect worker pod. 
> Then the hdfs file system will remain with partial folders created for 
> partitions during the restart time frames.
> So we might have conditions in hdfs as below
> /test1=0/test2=0/
> /test1=1/
> /test1=2/test2=2
> /test1=3/test2=3
> So the second partition has a missing directory in it. And if hive 
> integration is enabled, hive metastore exceptions will occur since there is a 
> partition expected from hive table is missing for few partitions in hdfs.
> *This can occur to any connector with some ongoing non atomic operation and a 
> restart is triggered to kafka connect worker pod. This will result in some 
> partially completed states in the system and may cause issues for the 
> connector to continue its operation*.
> *This is a very critical issue and needs some attention on ways for handling 
> the same.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.

2021-01-20 Thread kaushik srinivas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269064#comment-17269064
 ] 

kaushik srinivas edited comment on KAFKA-12164 at 1/21/21, 6:31 AM:


For the below comments

"Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand 
that a nested directory can be constructed from a given path, even if some of 
the parents exist. Which makes me think that a restart would allow the creation 
of the full path as long as the partitioner is deterministic with respect to 
the record (i.e. for a given record it always creates the same path). Upon a 
restart the export of a record will be retried, so the partitioner could 
possibly reattempt the full creation of the path. But that's an early 
assumption from my side. "

 

Lets suppose partitioner derives the directory /test1/test2 for a given record. 
And the connector crashes during the directory creation and hdfs ends up with 
only /test1/ folder.

But there is no guarantee that the kafka record which was not committed fully 
by the connector would exist in kafka even after restart/recovery of the 
connector. So there could be scenarios where these partial directories remain 
partial forever and requires an additional manual intervention to clean it up 
or complete its creation in hdfs, without which we see issues in hive table 
partitions if enabled.


was (Author: kaushik srinivas):
For the below comments

"Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand 
that a nested directory can be constructed from a given path, even if some of 
the parents exist. Which makes me think that a restart would allow the creation 
of the full path as long as the partitioner is deterministic with respect to 
the record (i.e. for a given record it always creates the same path). Upon a 
restart the export of a record will be retried, so the partitioner could 
possibly reattempt the full creation of the path. But that's an early 
assumption from my side. "

 

Lets suppose partitioner derives the directory /test1/test2 for a given record. 
And the connector crashed during the directory creation and hdfs ends up with 
only /test1/ folder.

But there is no guarantee that the kafka record which was not committed fully 
by the connector would exist in kafka even after restart/recovery of the 
connector. So there could be scenarios where these partial directories remain 
partial forever and requires an additional manual intervention to clean it up 
or complete its creation in hdfs, without which we see issues in hive table 
partitions if enabled.

> ssue when kafka connect worker pod restart, during creation of nested 
> partition directories in hdfs file system.
> 
>
> Key: KAFKA-12164
> URL: https://issues.apache.org/jira/browse/KAFKA-12164
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: kaushik srinivas
>Priority: Critical
>
> In our production labs, an issue is observed. Below is the sequence of the 
> same.
>  # hdfs connector is added to the connect worker.
>  # hdfs connector is creating folders in hdfs /test1=1/test2=2/
> Based on the custom partitioner. Here test1 and test2 are two separate nested 
> directories derived from multiple fields in the record using a custom 
> partitioner.
>  # Now kafka connect hdfs connector uses below function calls to create the 
> directories in the hdfs file system.
> fs.mkdirs(new Path(filename));
> ref: 
> [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java]
> Now the important thing to note is that if mkdirs() is a non atomic operation 
> (i.e can result in partial execution if interrupted)
> then suppose the first directory ie test1 is created and just before creation 
> of test2 in hdfs happens if there is a restart to the connect worker pod. 
> Then the hdfs file system will remain with partial folders created for 
> partitions during the restart time frames.
> So we might have conditions in hdfs as below
> /test1=0/test2=0/
> /test1=1/
> /test1=2/test2=2
> /test1=3/test2=3
> So the second partition has a missing directory in it. And if hive 
> integration is enabled, hive metastore exceptions will occur since there is a 
> partition expected from hive table is missing for few partitions in hdfs.
> *This can occur to any connector with some ongoing non atomic operation and a 
> restart is triggered to kafka connect worker pod. This will result in some 
> partially completed states in the system and may cause issues for the 
> connector to continue its operation*.
> *This is a very critical issue and needs some attention on ways for handling 
> 

[jira] [Commented] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.

2021-01-20 Thread kaushik srinivas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269064#comment-17269064
 ] 

kaushik srinivas commented on KAFKA-12164:
--

For the below comments

"Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand 
that a nested directory can be constructed from a given path, even if some of 
the parents exist. Which makes me think that a restart would allow the creation 
of the full path as long as the partitioner is deterministic with respect to 
the record (i.e. for a given record it always creates the same path). Upon a 
restart the export of a record will be retried, so the partitioner could 
possibly reattempt the full creation of the path. But that's an early 
assumption from my side. "

 

Lets suppose partitioner derives the directory /test1/test2 for a given record. 
And the connector crashed during the directory creation and hdfs ends up with 
only /test1/ folder.

But there is no guarantee that the kafka record which was not committed fully 
by the connector would exist in kafka even after restart/recovery of the 
connector. So there could be scenarios where these partial directories remain 
partial forever and requires an additional manual intervention to clean it up 
or complete its creation in hdfs, without which we see issues in hive table 
partitions if enabled.

> ssue when kafka connect worker pod restart, during creation of nested 
> partition directories in hdfs file system.
> 
>
> Key: KAFKA-12164
> URL: https://issues.apache.org/jira/browse/KAFKA-12164
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: kaushik srinivas
>Priority: Critical
>
> In our production labs, an issue is observed. Below is the sequence of the 
> same.
>  # hdfs connector is added to the connect worker.
>  # hdfs connector is creating folders in hdfs /test1=1/test2=2/
> Based on the custom partitioner. Here test1 and test2 are two separate nested 
> directories derived from multiple fields in the record using a custom 
> partitioner.
>  # Now kafka connect hdfs connector uses below function calls to create the 
> directories in the hdfs file system.
> fs.mkdirs(new Path(filename));
> ref: 
> [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java]
> Now the important thing to note is that if mkdirs() is a non atomic operation 
> (i.e can result in partial execution if interrupted)
> then suppose the first directory ie test1 is created and just before creation 
> of test2 in hdfs happens if there is a restart to the connect worker pod. 
> Then the hdfs file system will remain with partial folders created for 
> partitions during the restart time frames.
> So we might have conditions in hdfs as below
> /test1=0/test2=0/
> /test1=1/
> /test1=2/test2=2
> /test1=3/test2=3
> So the second partition has a missing directory in it. And if hive 
> integration is enabled, hive metastore exceptions will occur since there is a 
> partition expected from hive table is missing for few partitions in hdfs.
> *This can occur to any connector with some ongoing non atomic operation and a 
> restart is triggered to kafka connect worker pod. This will result in some 
> partially completed states in the system and may cause issues for the 
> connector to continue its operation*.
> *This is a very critical issue and needs some attention on ways for handling 
> the same.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.

2021-01-20 Thread kaushik srinivas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269061#comment-17269061
 ] 

kaushik srinivas commented on KAFKA-12164:
--

Hi [~kkonstantine]

We have created the ticket even on the hdfs sink connector side as well.

Below is the ticket 

[https://github.com/confluentinc/kafka-connect-hdfs/issues/538]

We have also captured more detailed analysis over there. But there are no 
responses in that forum as well.

-Kaushik

> ssue when kafka connect worker pod restart, during creation of nested 
> partition directories in hdfs file system.
> 
>
> Key: KAFKA-12164
> URL: https://issues.apache.org/jira/browse/KAFKA-12164
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: kaushik srinivas
>Priority: Critical
>
> In our production labs, an issue is observed. Below is the sequence of the 
> same.
>  # hdfs connector is added to the connect worker.
>  # hdfs connector is creating folders in hdfs /test1=1/test2=2/
> Based on the custom partitioner. Here test1 and test2 are two separate nested 
> directories derived from multiple fields in the record using a custom 
> partitioner.
>  # Now kafka connect hdfs connector uses below function calls to create the 
> directories in the hdfs file system.
> fs.mkdirs(new Path(filename));
> ref: 
> [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java]
> Now the important thing to note is that if mkdirs() is a non atomic operation 
> (i.e can result in partial execution if interrupted)
> then suppose the first directory ie test1 is created and just before creation 
> of test2 in hdfs happens if there is a restart to the connect worker pod. 
> Then the hdfs file system will remain with partial folders created for 
> partitions during the restart time frames.
> So we might have conditions in hdfs as below
> /test1=0/test2=0/
> /test1=1/
> /test1=2/test2=2
> /test1=3/test2=3
> So the second partition has a missing directory in it. And if hive 
> integration is enabled, hive metastore exceptions will occur since there is a 
> partition expected from hive table is missing for few partitions in hdfs.
> *This can occur to any connector with some ongoing non atomic operation and a 
> restart is triggered to kafka connect worker pod. This will result in some 
> partially completed states in the system and may cause issues for the 
> connector to continue its operation*.
> *This is a very critical issue and needs some attention on ways for handling 
> the same.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.

2021-01-20 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269056#comment-17269056
 ] 

Konstantine Karantasis commented on KAFKA-12164:


Thanks for reporting [~kaushik srinivas] 

First, I need to note that this issue seems to belong in its entirety to:

[https://github.com/confluentinc/kafka-connect-hdfs/issues] 

given that the behavior is not affected by the guarantees that the Kafka 
Connect framework provides. 

Briefly, let me state the obvious, that from a worker's perspective restarts 
(graceful or not) can happen at any time. 
Additionally the connector does not provide any guarantees with respect to the 
atomicity of the nested directory structure. Things become more challenging 
given that the partitioner as you mentioned is custom. 

Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand 
that a nested directory can be constructed from a given path, even if some of 
the parents exist. Which makes me think that a restart would allow the creation 
of the full path as long as the partitioner is deterministic with respect to 
the record (i.e. for a given record it always creates the same path). Upon a 
restart the export of a record will be retried, so the partitioner could 
possibly reattempt the full creation of the path. But that's an early 
assumption from my side. 

Shall we take the discussion to the appropriate forum? 
https://github.com/confluentinc/kafka-connect-hdfs/issues

> ssue when kafka connect worker pod restart, during creation of nested 
> partition directories in hdfs file system.
> 
>
> Key: KAFKA-12164
> URL: https://issues.apache.org/jira/browse/KAFKA-12164
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: kaushik srinivas
>Priority: Critical
>
> In our production labs, an issue is observed. Below is the sequence of the 
> same.
>  # hdfs connector is added to the connect worker.
>  # hdfs connector is creating folders in hdfs /test1=1/test2=2/
> Based on the custom partitioner. Here test1 and test2 are two separate nested 
> directories derived from multiple fields in the record using a custom 
> partitioner.
>  # Now kafka connect hdfs connector uses below function calls to create the 
> directories in the hdfs file system.
> fs.mkdirs(new Path(filename));
> ref: 
> [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java]
> Now the important thing to note is that if mkdirs() is a non atomic operation 
> (i.e can result in partial execution if interrupted)
> then suppose the first directory ie test1 is created and just before creation 
> of test2 in hdfs happens if there is a restart to the connect worker pod. 
> Then the hdfs file system will remain with partial folders created for 
> partitions during the restart time frames.
> So we might have conditions in hdfs as below
> /test1=0/test2=0/
> /test1=1/
> /test1=2/test2=2
> /test1=3/test2=3
> So the second partition has a missing directory in it. And if hive 
> integration is enabled, hive metastore exceptions will occur since there is a 
> partition expected from hive table is missing for few partitions in hdfs.
> *This can occur to any connector with some ongoing non atomic operation and a 
> restart is triggered to kafka connect worker pod. This will result in some 
> partially completed states in the system and may cause issues for the 
> connector to continue its operation*.
> *This is a very critical issue and needs some attention on ways for handling 
> the same.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-20 Thread GitBox


guozhangwang commented on pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#issuecomment-764408620


   LGTM.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9819: KAFKA-10694: Implement zero copy for FetchSnapshot

2021-01-20 Thread GitBox


hachikuji commented on a change in pull request #9819:
URL: https://github.com/apache/kafka/pull/9819#discussion_r561625246



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException {
 }
 
 @Override
-public void append(ByteBuffer buffer) throws IOException {
+public void append(BaseRecords records) throws IOException {
 if (frozen) {
 throw new IllegalStateException(
-String.format("Append is not supported. Snapshot is already 
frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+String.format("Append is not supported. Snapshot is 
already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
 );
 }
-
+ByteBuffer buffer;
+if (records instanceof MemoryRecords) {
+buffer = ((MemoryRecords) records).buffer();
+} else {

Review comment:
   I feel more inclined to raise an exception if we get a `BaseRecords` 
type that is not `MemoryRecords`. If we really get an unexpected file in here, 
then we need to reconsider the IO model instead of hiding a big copy. We could 
even make the expectation explicit in the parameter type even if it is not 100% 
symmetric with `RawSnapshotReader`.

##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1093,9 +1093,11 @@ public static final void readFully(InputStream 
inputStream, ByteBuffer destinati
 destinationBuffer.position(destinationBuffer.position() + 
totalBytesRead);
 }
 
-public static void writeFully(FileChannel channel, ByteBuffer 
sourceBuffer) throws IOException {
+public static int writeFully(FileChannel channel, ByteBuffer sourceBuffer) 
throws IOException {
+int size = 0;

Review comment:
   Hmm.. Not sure we need to compute this. Wouldn't it be the same as 
`sourceBuffer.remaining()`?

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1220,34 +1216,35 @@ private FetchSnapshotResponseData 
handleFetchSnapshotRequest(
 if (!snapshotOpt.isPresent()) {
 return FetchSnapshotResponse.singleton(
 log.topicPartition(),
-responsePartitionSnapshot -> {
-return addQuorumLeader(responsePartitionSnapshot)
-.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
-}
+responsePartitionSnapshot -> 
addQuorumLeader(responsePartitionSnapshot)
+.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code())
 );
 }
 
 try (RawSnapshotReader snapshot = snapshotOpt.get()) {
 if (partitionSnapshot.position() < 0 || 
partitionSnapshot.position() >= snapshot.sizeInBytes()) {
 return FetchSnapshotResponse.singleton(
 log.topicPartition(),
-responsePartitionSnapshot -> {
-return addQuorumLeader(responsePartitionSnapshot)
-.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code());
-}
+responsePartitionSnapshot -> 
addQuorumLeader(responsePartitionSnapshot)
+.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code())
 );
 }
 
 int maxSnapshotSize;
+int maxSnapshotPosition;
 try {
 maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes());
 } catch (ArithmeticException e) {
 maxSnapshotSize = Integer.MAX_VALUE;
 }
 
-ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(), 
maxSnapshotSize));
-snapshot.read(buffer, partitionSnapshot.position());
-buffer.flip();
+try {
+maxSnapshotPosition = 
Math.toIntExact(partitionSnapshot.position());
+} catch (ArithmeticException e) {
+maxSnapshotPosition = Integer.MAX_VALUE;

Review comment:
   I agree we should probably throw this. Snapshot size limits are an 
interesting point which I hadn't thought about. Currently `FileRecords` does 
not support files which are larger than Int.MaxValue. That gives us a 2GB 
limit. My feeling is that is probably good enough initially, but perhaps that 
adds some fuel for the effort to generalize the zero-copy support.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-20 Thread GitBox


dengziming commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-764402380


   @rajinisivaram I rebased the code and added a test in 
`PlaintextAdminIntegrationTest`, PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12227) Add method "Producer#produce" to return CompletionStage instead of Future

2021-01-20 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-12227:
--

Assignee: Chia-Ping Tsai

>  Add method "Producer#produce" to return CompletionStage instead of Future
> --
>
> Key: KAFKA-12227
> URL: https://issues.apache.org/jira/browse/KAFKA-12227
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> Producer and KafkaProducer return a java.util.concurrent.Future from their 
> send methods. This makes it challenging to write asynchronous non-blocking 
> code given Future's limited interface. Since Kafka now requires Java 8, we 
> now have the option of using CompletionStage and/or CompletableFuture that 
> were introduced to solve this issue. It's worth noting that the Kafka 
> AdminClient solved this issue by using org.apache.kafka.common.KafkaFuture as 
> Java 7 support was still required then.
>  
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-XXX%3A+Return+CompletableFuture+from+KafkaProducer.send



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-20 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {
+  if (shouldForwardRequest(request)) {
+forwardingManager.sendInterBrokerRequest(
+  getCreateTopicsRequest(Seq(internalTopicName)),
+  _ => ())
+  } else {
+val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+val topicConfigs = Map(internalTopicName -> 
getTopicConfigs(internalTopicName))
+adminManager.createTopics(
+  config.requestTimeoutMs,
+  validateOnly = false,
+  topicConfigs,
+  Map.empty,
+  controllerMutationQuota,
+  _ => ())
+  }
 }
-val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
-} else {
-  val coordinatorEndpoint = topicMetadata.partitions.asScala
-.find(_.partitionIndex == partition)
-.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-.flatMap(metadata => 
metadataCache.getAliveBroker(metadata.leaderId))
-.flatMap(_.getNode(request.context.listenerName))
-.filterNot(_.isEmpty)
-
-  coordinatorEndpoint match {
-case Some(endpoint) =>
-  createFindCoordinatorResponse(Errors.NONE, endpoint)
-case _ =>
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
+
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
createFindCoordinatorResponse(
+  Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+  } else {
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  val responseBody = if (topicMetadata.head.errorCode != 
Errors.NONE.code) {
+   

[GitHub] [kafka] chia7712 commented on pull request #9939: MINOR: fix @link tag in javadoc

2021-01-20 Thread GitBox


chia7712 commented on pull request #9939:
URL: https://github.com/apache/kafka/pull/9939#issuecomment-764247507


   @tang7526 thanks for your patch. Could you fix other docs error also?
   
   ```
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:42:
 warning: Could not find any member to link for "Serde".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:24:
 warning: Could not find any member to link for 
"org.apache.kafka.streams.kstream.Joined".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:24:
 warning: Could not find any member to link for "Serde".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala:37:
 warning: Could not find any member to link for "Serde".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala:24:
 warning: Could not find any member to link for "Serde".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:68:
 warning: Could not find any member to link for "Topology.AutoOffsetReset".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:55:
 warning: Could not find any member to link for "TimestampExtractor".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:45:
 warning: Could not find any member to link for "Serde".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala:157:
 warning: Could not find any member to link for 
"org.apache.kafka.streams.errors.TopologyException".
 /**
 ^
   44 warnings
   
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

2021-01-20 Thread GitBox


feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r561589867



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1057,7 +1057,8 @@ class Log(@volatile private var _dir: File,
  leaderEpoch: Int,
  origin: AppendOrigin = AppendOrigin.Client,
  interBrokerProtocolVersion: ApiVersion = 
ApiVersion.latestVersion): LogAppendInfo = {
-append(records, origin, interBrokerProtocolVersion, assignOffsets = true, 
leaderEpoch, ignoreRecordSize = false)
+val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
+append(records, origin, interBrokerProtocolVersion, 
validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false)

Review comment:
   Make sense to me, it is also a little bit odd to me, but I put it here 
because I think `assignOffsets`==true for `appendAsLeader` and ==false for 
`appendAsFollower`, which means normally `assignOffsets` is determined by the 
caller, the `RaftLeader` is just a special case for `appendAsLeader`, if we 
move the logic in `analyzeAndValidateRecords`, that means it need to determine 
whether to `assignOffsets` without caller info, does that doable? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9891: MINOR: Remind user index file is empty when dumping LogSegment index file

2021-01-20 Thread GitBox


chia7712 commented on pull request #9891:
URL: https://github.com/apache/kafka/pull/9891#issuecomment-764233284


   @17hao Thanks for your patch!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9891: MINOR: Remind user index file is empty when dumping LogSegment index file

2021-01-20 Thread GitBox


chia7712 merged pull request #9891:
URL: https://github.com/apache/kafka/pull/9891


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

2021-01-20 Thread GitBox


feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r561587550



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -180,7 +179,7 @@ private void startNewBatch() {
 nextOffset,
 time.milliseconds(),
 false,
-RecordBatch.NO_PARTITION_LEADER_EPOCH,
+epoch,

Review comment:
   Let me check





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-01-20 Thread GitBox


jolshan opened a new pull request #9944:
URL: https://github.com/apache/kafka/pull/9944


   Most components are ready here. Topic IDs replace topic names in fetch 
requests.
   
   I need to look into FetchSession more. Currently 
testCreateIncrementalFetchWithPartitionsWithIdError() fails because 
FetchSession does not cache partitions that resulted in UNKNOWN_TOPIC_ID error. 
I need to figure out if this is something that should be done.
   
   I also plan to add a test to simulate a rolling upgrade where some brokers 
do not have topic IDs in metadata. I want to ensure that Consumers can still 
poll and get data.
   
   I will also run the various fetch benchmarks and compare the results to trunk
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-20 Thread GitBox


dengziming commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-764219186


   > https://github.com/apache/kafka/pull/9769#issuecomment-764014519
   
   @rajinisivaram @jolshan I also realized this, we add topicId in 
`MetadataCache` from IBP=2.8.1, so instead of using `config.usesTopicId` it 
would make sense to check `config.interBrokerProtocolVersion >= KAFKA_2_8_IV1`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9926: KAFKA-12175 Migrate generator module to junit5

2021-01-20 Thread GitBox


chia7712 merged pull request #9926:
URL: https://github.com/apache/kafka/pull/9926


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5

2021-01-20 Thread GitBox


chia7712 commented on pull request #9926:
URL: https://github.com/apache/kafka/pull/9926#issuecomment-764212202


   > What have you seen that implies that? 
   
   The following error implies that recent fixes are not included.
   
   ```
   java.lang.AssertionError: Expected all streams instances in 
[org.apache.kafka.streams.KafkaStreams@2d6a0fff] to be REBALANCING within 3 
ms, but the following were not: 
{org.apache.kafka.streams.KafkaStreams@2d6a0fff=RUNNING}
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitForApplicationState$12(IntegrationTestUtils.java:936)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:350)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:318)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState(IntegrationTestUtils.java:919)
at 
org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect(AdjustStreamThreadCountTest.java:229)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   ```
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9926/8/testReport/junit/org.apache.kafka.streams.integration/AdjustStreamThreadCountTest/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hongshaoyang edited a comment on pull request #9943: MINOR: Fix typo in Utils.java

2021-01-20 Thread GitBox


hongshaoyang edited a comment on pull request #9943:
URL: https://github.com/apache/kafka/pull/9943#issuecomment-764209143


   Ping @guozhangwang 
   
   Original typo was added in 
https://github.com/apache/kafka/commit/3a9f4b833bb7e86dc759361c33f4321ab043db05 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hongshaoyang commented on pull request #9943: MINOR: Fix typo in Utils.java

2021-01-20 Thread GitBox


hongshaoyang commented on pull request #9943:
URL: https://github.com/apache/kafka/pull/9943#issuecomment-764209143


   Ping @guozhangwang 
   
   Original typo was added in 
   
https://github.com/apache/kafka/commit/3a9f4b833bb7e86dc759361c33f4321ab043db05 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hongshaoyang opened a new pull request #9943: MINOR: Fix typo in Update Utils.java

2021-01-20 Thread GitBox


hongshaoyang opened a new pull request #9943:
URL: https://github.com/apache/kafka/pull/9943


   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9942: KAFKA-12229: reset to original class loader after connector stop

2021-01-20 Thread GitBox


showuon commented on pull request #9942:
URL: https://github.com/apache/kafka/pull/9942#issuecomment-764189012


   @chia7712 @ijuma @guozhangwang , please take a look at the PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9942: KAFKA-12229: reset to original class loader after connector stop

2021-01-20 Thread GitBox


showuon opened a new pull request #9942:
URL: https://github.com/apache/kafka/pull/9942


   ```
   java.lang.NullPointerException at 
org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopics(MirrorSourceConnector.java:348)
 at 
org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:192)
 at 
org.apache.kafka.connect.mirror.MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst(MirrorSourceConnectorTest.java:222)
   ```
   After days of investigation, I finally found the root cause of the test 
failure reason: **class loader**.
   The issue is quite weird, we mocked the method, but still call the real 
method, and cause the NPE. Digging into the Mockito, found it's not about JUnit 
5, it's because of the class loader. In Mockito, we relies on the class loader 
to generate the proxy instance 
([source](https://github.com/mockito/mockito/blob/release/3.x/src/main/java/org/mockito/internal/creation/bytebuddy/SubclassBytecodeGenerator.java#L91)),
 and if the class loader is not expected, we'll generate the wrong proxy 
instance (with wrong class path). We set the class loader during connector 
start to resolve conflicting dependencies (KIP-146), so we should set it back 
to the original class loader after connector stop in tests 
(`EmbeddedConnectCluster` is only used in tests) for Mockito works as expected.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9840: KAFKA-10867: Improved task idling

2021-01-20 Thread GitBox


guozhangwang commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r561459260



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -78,15 +89,149 @@ RecordQueue queue() {
 }
 }
 
-PartitionGroup(final Map partitionQueues, 
final Sensor recordLatenessSensor) {
+PartitionGroup(final TaskId id,
+   final Map partitionQueues,
+   final Sensor recordLatenessSensor,
+   final Sensor enforcedProcessingSensor,
+   final long maxTaskIdleMs) {
+this.id = id;
 nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::headRecordTimestamp));
 this.partitionQueues = partitionQueues;
+this.enforcedProcessingSensor = enforcedProcessingSensor;
+this.maxTaskIdleMs = maxTaskIdleMs;
 this.recordLatenessSensor = recordLatenessSensor;
 totalBuffered = 0;
 allBuffered = false;
 streamTime = RecordQueue.UNKNOWN;
 }
 
+public void addFetchedMetadata(final TopicPartition partition, final 
ConsumerRecords.Metadata metadata) {
+final Long lag = metadata.lag();
+if (lag != null) {
+LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag);
+fetchedLags.put(partition, lag);
+}
+}
+
+public boolean readyToProcess(final long wallClockTime) {
+if (LOG.isTraceEnabled()) {
+for (final Map.Entry entry : 
partitionQueues.entrySet()) {
+LOG.trace(
+"[{}] buffered/lag {}: {}/{}",
+id,
+entry.getKey(),
+entry.getValue().size(),
+fetchedLags.get(entry.getKey())
+);
+}
+}
+// Log-level strategy:
+//  TRACE for messages that don't wait for fetches, since these may be 
logged at extremely high frequency
+//  DEBUG when we waited for a fetch and decided to wait some more, as 
configured
+//  DEBUG when we are ready for processing and didn't have to enforce 
processing
+//  INFO  when we enforce processing, since this has to wait for 
fetches AND may result in disorder
+
+if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
+if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
+final Set bufferedPartitions = new HashSet<>();
+final Set emptyPartitions = new HashSet<>();
+for (final Map.Entry entry : 
partitionQueues.entrySet()) {
+if (entry.getValue().isEmpty()) {
+emptyPartitions.add(entry.getKey());
+} else {
+bufferedPartitions.add(entry.getKey());
+}
+}
+LOG.trace("[{}] Ready for processing because max.task.idle.ms 
is disabled." +
+  "\n\tThere may be out-of-order processing for 
this task as a result." +
+  "\n\tBuffered partitions: {}" +
+  "\n\tNon-buffered partitions: {}",
+  id,
+  bufferedPartitions,
+  emptyPartitions);
+}
+return true;

Review comment:
   Should we log INFO if we are indeed enforcing processing? I.e. there are 
some empty partitions.

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -134,6 +134,8 @@
 @SuppressWarnings("deprecation")
 public class StreamsConfig extends AbstractConfig {
 
+public static final long MAX_TASK_IDLE_MS_DISABLED = -1;

Review comment:
   nit: move this down below to 147?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
##
@@ -73,28 +82,22 @@
 private final byte[] recordKey = intSerializer.serialize(null, 1);
 
 private final Metrics metrics = new Metrics();
+private final Sensor enforcedProcessingSensor = 
metrics.sensor(UUID.randomUUID().toString());
 private final MetricName lastLatenessValue = new 
MetricName("record-lateness-last-value", "", "", mkMap());
 
-private PartitionGroup group;
 
 private static Sensor getValueSensor(final Metrics metrics, final 
MetricName metricName) {
 final Sensor lastRecordedValue = metrics.sensor(metricName.name());
 lastRecordedValue.add(metricName, new Value());
 return lastRecordedValue;
 }
 
-@Before

Review comment:
   Good refactoring!

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -52,15 +58,20 @@
  * (i.e., it increases or stays the same over time).
  */
 public class 

[GitHub] [kafka] showuon closed pull request #9936: [WIP] reset to default class loader

2021-01-20 Thread GitBox


showuon closed pull request #9936:
URL: https://github.com/apache/kafka/pull/9936


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12229) Fix flaky MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst test

2021-01-20 Thread Luke Chen (Jira)
Luke Chen created KAFKA-12229:
-

 Summary: Fix flaky 
MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst test
 Key: KAFKA-12229
 URL: https://issues.apache.org/jira/browse/KAFKA-12229
 Project: Kafka
  Issue Type: Test
Reporter: Luke Chen
Assignee: Luke Chen


h3. Stacktrace

java.lang.NullPointerException at 
org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopics(MirrorSourceConnector.java:348)
 at 
org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:192)
 at 
org.apache.kafka.connect.mirror.MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst(MirrorSourceConnectorTest.java:222)

 

Also happened in MirrorCheckpointConnectorTest.testFindConsumerGroups
h3. Stacktrace

java.lang.NullPointerException at 
org.apache.kafka.connect.mirror.MirrorCheckpointConnector.listConsumerGroups(MirrorCheckpointConnector.java:158)
 at 
org.apache.kafka.connect.mirror.MirrorCheckpointConnectorTest.testFindConsumerGroups(MirrorCheckpointConnectorTest.java:89)

 

https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk11/415/testReport/junit/org.apache.kafka.connect.mirror/MirrorSourceConnectorTest/testRefreshTopicPartitionsTopicOnTargetFirst__/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-20 Thread GitBox


hachikuji commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561451602



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
   In the case of forwarding, maybe we can let the controller decide if 
there are enough alive brokers.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,

[GitHub] [kafka] guozhangwang commented on a change in pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-20 Thread GitBox


guozhangwang commented on a change in pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#discussion_r561449314



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2041,6 +2052,118 @@ public void testInvalidGroupMetadata() throws 
InterruptedException {
 assertThrows(IllegalStateException.class, consumer::groupMetadata);
 }
 
+@Test
+public void testPollMetadata() {
+final Time time = new MockTime();
+final SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+final ConsumerMetadata metadata = createMetadata(subscription);
+final MockClient client = new MockClient(time, metadata);
+
+initMetadata(client, singletonMap(topic, 1));
+final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+final KafkaConsumer consumer =
+newConsumer(time, client, subscription, metadata, assignor, true, 
groupInstanceId);
+
+consumer.assign(singleton(tp0));
+consumer.seek(tp0, 50L);
+
+final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
+client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo)));
+
+final ConsumerRecords records = 
consumer.poll(Duration.ofMillis(1));
+assertEquals(5, records.count());
+assertEquals(55L, consumer.position(tp0));
+
+// verify that the consumer computes the correct metadata based on the 
fetch response
+final ConsumerRecords.Metadata actualMetadata = 
records.metadata().get(tp0);
+assertEquals(100L, (long) actualMetadata.endOffset());
+assertEquals(55L, (long) actualMetadata.position());
+assertEquals(45L, (long) actualMetadata.lag());
+consumer.close(Duration.ZERO);
+}
+
+
+@Test
+public void testPollMetadataWithExtraPartitions() {

Review comment:
   Does the test cover 1) stale epoch, 2) no prev value, cases?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -637,20 +636,32 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map timestamp
 } else {
 List> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-if (!records.isEmpty()) {
-TopicPartition partition = nextInLineFetch.partition;
-List> currentRecords = 
fetched.get(partition);
-if (currentRecords == null) {
-fetched.put(partition, records);
-} else {
-// this case shouldn't usually happen because we 
only send one fetch at a time per partition,
-// but it might conceivably happen in some rare 
cases (such as partition leader changes).
-// we have to copy to a new list because the old 
one may be immutable
-List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
-newRecords.addAll(currentRecords);
-newRecords.addAll(records);
-fetched.put(partition, newRecords);
+TopicPartition partition = nextInLineFetch.partition;
+
+if (subscriptions.isAssigned(partition)) {
+// initializeCompletedFetch, above, has already 
persisted the metadata from the fetch in the
+// SubscriptionState, so we can just read it out, 
which in particular lets us re-use the logic
+// for determining the end offset
+final long receivedTimestamp = 
nextInLineFetch.receivedTimestamp;
+final Long beginningOffset = 
subscriptions.logStartOffset(partition);
+final Long endOffset = 
subscriptions.logEndOffset(partition, isolationLevel);
+final FetchPosition fetchPosition = 
subscriptions.position(partition);
+
+final FetchedRecords.FetchMetadata fetchMetadata = 
fetched.metadata().get(partition);
+if (fetchMetadata == null
+|| 
!fetchMetadata.position().offsetEpoch.isPresent()
+|| fetchPosition.offsetEpoch.isPresent()
+&& fetchMetadata.position().offsetEpoch.get() <= 
fetchPosition.offsetEpoch.get()) {

Review comment:
   Interesting, why we do not want to update the metadata if epoch is stale?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -637,20 +636,32 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map timestamp
 } else {
 List> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-

[GitHub] [kafka] mjsax commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class

2021-01-20 Thread GitBox


mjsax commented on a change in pull request #9940:
URL: https://github.com/apache/kafka/pull/9940#discussion_r561434718



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -2647,8 +2647,7 @@ public void 
shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
 task01.setCommittableOffsetsAndMetadata(offsetsT01);
 final StateMachineTask task02 = new StateMachineTask(taskId02, 
taskId02Partitions, true);
 
-consumer.groupMetadata();
-expectLastCall().andReturn(null).anyTimes();
+expect(consumer.groupMetadata()).andStubReturn(null);

Review comment:
   Another simplification; make it a one-liner. Same below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class

2021-01-20 Thread GitBox


mjsax commented on a change in pull request #9940:
URL: https://github.com/apache/kafka/pull/9940#discussion_r561434387



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -190,7 +190,7 @@ public void 
shouldIdempotentlyUpdateSubscriptionFromActiveAssignment() {
 final TopicPartition newTopicPartition = new TopicPartition("topic2", 
1);
 final Map> assignment = 
mkMap(mkEntry(taskId01, mkSet(t1p1, newTopicPartition)));
 
-expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andReturn(emptyList()).anyTimes();
+expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(emptyList());

Review comment:
   @ableegoldman I just update the whole test class...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class

2021-01-20 Thread GitBox


mjsax commented on a change in pull request #9940:
URL: https://github.com/apache/kafka/pull/9940#discussion_r561429380



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -2779,6 +2779,62 @@ public void suspend() {
 assertThat(task01.state(), is(Task.State.SUSPENDED));
 }
 
+@Test
+public void shouldConvertActiveTaskToStandbyTask() {
+final StreamTask activeTask = mock(StreamTask.class);
+expect(activeTask.id()).andReturn(taskId00).anyTimes();
+
expect(activeTask.inputPartitions()).andReturn(taskId00Partitions).anyTimes();

Review comment:
   Will fix right away -- Jenkins failed anyway and we need to rerun it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class

2021-01-20 Thread GitBox


mjsax commented on pull request #9940:
URL: https://github.com/apache/kafka/pull/9940#issuecomment-764070908


   All three runs failed with different errors:
   
   JDK8:
   ```
   
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   java.util.concurrent.ExecutionException: 
org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
37.320395596193315 (600 connections / 16.077 sec) ==> expected: <30.0> but was: 
<37.320395596193315>
   ```
   
   JDK11:
   ```
   kafka.admin.FeatureCommandTest.testDescribeFeaturesSuccess()
   org.opentest4j.AssertionFailedError: expected:  but was: 
   ```
   
   JDK15:
   ```
   kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback()
   org.opentest4j.AssertionFailedError: Partition [group1_largeTopic,5] 
metadata not propagated after 15000 ms
   
   // and
   
   kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback()
   org.opentest4j.AssertionFailedError: Partition [group1_largeTopic,5] 
metadata not propagated after 15000 ms
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9819: KAFKA-10694: Implement zero copy for FetchSnapshot

2021-01-20 Thread GitBox


jsancio commented on a change in pull request #9819:
URL: https://github.com/apache/kafka/pull/9819#discussion_r561427699



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException {
 }
 
 @Override
-public void append(ByteBuffer buffer) throws IOException {
+public void append(BaseRecords records) throws IOException {
 if (frozen) {
 throw new IllegalStateException(
-String.format("Append is not supported. Snapshot is already 
frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+String.format("Append is not supported. Snapshot is 
already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
 );
 }
-
+ByteBuffer buffer;
+if (records instanceof MemoryRecords) {
+buffer = ((MemoryRecords) records).buffer();
+} else {
+buffer = ByteBuffer.allocate(records.sizeInBytes());
+((FileRecords) records).channel().read(buffer);
+buffer.flip();
+}

Review comment:
   > I change the signature to keep consistent with FileRawSnapshotReader
   
   Okay. I think this is something that I struggled with when creating the 
original APIs. I am okay with "inconsistent" APIs since 
`RawSnapshot{Reader,Writer}` are internal interfaces to the raft client and are 
not exposed to the state machine (controller).
   
   I think this "inconsistency" will go away when we implement the long term 
solution.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

2021-01-20 Thread GitBox


guozhangwang commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764063387


   > Weird -- these changes seem to be causing the `SaslXConsumerTest` family 
of tests to hang. I'm not very (or at all) familiar with these tests so I 
haven't found anything yet but I'm actively looking into it
   
   Hmm... I'm not familiar with SaslXConsumerTest either...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-20 Thread GitBox


wcarlson5 commented on a change in pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#discussion_r561415932



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -407,7 +413,7 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
 referenceContainer.nextScheduledRebalanceMs,
 shutdownErrorHook,
 streamsUncaughtExceptionHandler,
-cacheSize -> cache.resize(cacheSize)

Review comment:
   I think it caused a test to fail but not everytime. It also could have 
been fixed since then as changes have been made. If all the tests pass it's 
probably fine





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper

2021-01-20 Thread GitBox


hachikuji commented on a change in pull request #9912:
URL: https://github.com/apache/kafka/pull/9912#discussion_r561415958



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -371,9 +372,44 @@ class RequestChannel(val queueSize: Int,
 requestQueue.put(request)
   }
 
-  /** Send a response back to the socket server to be sent over the network */
-  def sendResponse(response: RequestChannel.Response): Unit = {
+  def closeConnection(
+request: RequestChannel.Request,
+errorCounts: java.util.Map[Errors, Integer]
+  ): Unit = {
+// This case is used when the request handler has encountered an error, 
but the client
+// does not expect a response (e.g. when produce request has acks set to 0)
+updateErrorMetrics(request.header.apiKey, errorCounts.asScala)
+sendResponse(new RequestChannel.CloseConnectionResponse(request))
+  }
+
+  def sendResponse(
+request: RequestChannel.Request,
+response: AbstractResponse,
+onComplete: Option[Send => Unit]

Review comment:
   I decided not to do this here. I didn't like replacing `None` with `_ => 
{}` in uses, and neither did I like making the argument optional. The 
alternative is to introduce a constant "no-op" function, but I found this also 
a little awkward. If you think of a nice way to do it, I can review. I do think 
it is better having a simpler type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-20 Thread GitBox


ableegoldman commented on a change in pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#discussion_r561412170



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -407,7 +413,7 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
 referenceContainer.nextScheduledRebalanceMs,
 shutdownErrorHook,
 streamsUncaughtExceptionHandler,
-cacheSize -> cache.resize(cacheSize)

Review comment:
   Interesting. It *should* be exactly the same, but of course who knows 
with Java. Did it cause a test to fail or was it something more subtle?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-20 Thread GitBox


wcarlson5 commented on pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#issuecomment-764054886


   Overall LGTM. I am not sure about the cache change but the changes to the 
log makes a lot of sense



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

2021-01-20 Thread GitBox


ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764053539


   Weird -- these changes seem to be causing the `SaslXConsumerTest` family of 
tests to hang. I'm not very (or at all) familiar with these tests so I haven't 
found anything yet but I'm actively looking into it 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-20 Thread GitBox


wcarlson5 commented on a change in pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#discussion_r561409907



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -407,7 +413,7 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
 referenceContainer.nextScheduledRebalanceMs,
 shutdownErrorHook,
 streamsUncaughtExceptionHandler,
-cacheSize -> cache.resize(cacheSize)

Review comment:
   My ide tried to optimize this as well. At the time not passing in 
cacheSize caused some expections. I would be careful about making this change 
without need





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-20 Thread GitBox


ableegoldman commented on a change in pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#discussion_r561394151



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1053,7 +1054,7 @@ int maybeCommitActiveTasksPerUserRequested() {
 }
 
 private void commitOffsetsOrTransaction(final Map> offsetsPerTask) {
-log.debug("Committing task offsets {}", offsetsPerTask);
+log.debug("Committing task offsets {}", 
offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t -> 
t.getKey().id(), Entry::getValue))); // avoid logging actual Task objects

Review comment:
   An unrelated but equally annoying thing I noticed in the logs: we should 
never log a full `Task` object because it prints literally everything about the 
task, including for example the topology description which is not that useful 
but sometimes VERY long





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-20 Thread GitBox


ableegoldman commented on pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#issuecomment-764036403


   call for review @lct45 @wcarlson5 @cadonna 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-20 Thread GitBox


ableegoldman opened a new pull request #9941:
URL: https://github.com/apache/kafka/pull/9941


   One more try to get the logging levels right..the only way to log something 
within the main StreamThread loop without absolutely flooding the logs at a 
level that isn't appropriate for INFO is to just set some kind of interval to 
stick to. I chose to log the summary every 2 min, since this is long enough to 
prevent log spam but short enough to fit at least 2 summaries within the 
(default) poll interval of 5 min



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10044) Deprecate ConsumerConfig#addDeserializerToConfig and ProducerConfig#addSerializerToConfig

2021-01-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10044:

Description: 
from [~ijuma] suggestion 
([https://github.com/apache/kafka/pull/8605#discussion_r430431086])
{quote}I think you could submit a KIP for the deprecation of the two methods in 
this class, but we can merge the other changes in the meantime.
{quote}
KIP-620: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=155749118] 

  was:
from [~ijuma] suggestion 
(https://github.com/apache/kafka/pull/8605#discussion_r430431086)

{quote}
I think you could submit a KIP for the deprecation of the two methods in this 
class, but we can merge the other changes in the meantime.
{quote}


> Deprecate ConsumerConfig#addDeserializerToConfig and 
> ProducerConfig#addSerializerToConfig
> -
>
> Key: KAFKA-10044
> URL: https://issues.apache.org/jira/browse/KAFKA-10044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: kip
>
> from [~ijuma] suggestion 
> ([https://github.com/apache/kafka/pull/8605#discussion_r430431086])
> {quote}I think you could submit a KIP for the deprecation of the two methods 
> in this class, but we can merge the other changes in the meantime.
> {quote}
> KIP-620: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=155749118] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10044) Deprecate ConsumerConfig#addDeserializerToConfig and ProducerConfig#addSerializerToConfig

2021-01-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10044:

Labels: kip  (was: need-kip)

> Deprecate ConsumerConfig#addDeserializerToConfig and 
> ProducerConfig#addSerializerToConfig
> -
>
> Key: KAFKA-10044
> URL: https://issues.apache.org/jira/browse/KAFKA-10044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: kip
>
> from [~ijuma] suggestion 
> (https://github.com/apache/kafka/pull/8605#discussion_r430431086)
> {quote}
> I think you could submit a KIP for the deprecation of the two methods in this 
> class, but we can merge the other changes in the meantime.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10044) Deprecate ConsumerConfig#addDeserializerToConfig and ProducerConfig#addSerializerToConfig

2021-01-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10044:

Fix Version/s: 2.7.0

> Deprecate ConsumerConfig#addDeserializerToConfig and 
> ProducerConfig#addSerializerToConfig
> -
>
> Key: KAFKA-10044
> URL: https://issues.apache.org/jira/browse/KAFKA-10044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: kip
> Fix For: 2.7.0
>
>
> from [~ijuma] suggestion 
> ([https://github.com/apache/kafka/pull/8605#discussion_r430431086])
> {quote}I think you could submit a KIP for the deprecation of the two methods 
> in this class, but we can merge the other changes in the meantime.
> {quote}
> KIP-620: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=155749118] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-20 Thread GitBox


aloknnikhil commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561376605



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -750,9 +748,9 @@ void start(int nodeId) {
 PersistentState persistentState = nodes.get(nodeId);
 MockNetworkChannel channel = new 
MockNetworkChannel(correlationIdCounter);
 MockMessageQueue messageQueue = new MockMessageQueue();
-List voterNodes = voterNodesFromIds(voters, 
Cluster::nodeAddress);
-RaftConfig raftConfig = buildRaftConfig(REQUEST_TIMEOUT_MS, 
RETRY_BACKOFF_MS, ELECTION_TIMEOUT_MS,
-ELECTION_JITTER_MS, FETCH_TIMEOUT_MS, LINGER_MS, 
voterNodes);
+Map voterNodes = 
voterNodesFromIds(voters, Cluster::nodeAddress);

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-20 Thread GitBox


aloknnikhil commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561375789



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##
@@ -16,36 +16,15 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.Node;
-
 import java.net.InetSocketAddress;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class RaftTestUtil {
-public static RaftConfig buildRaftConfig(
-int requestTimeoutMs,
-int retryBackoffMs,
-int electionTimeoutMs,
-int electionBackoffMs,
-int fetchTimeoutMs,
-int appendLingerMs,
-List voterNodes
-) {
-Map voterConnections = voterNodes.stream()
-.collect(Collectors.toMap(Node::id, node -> new 
InetSocketAddress(node.host(), node.port(;
-return new RaftConfig(voterConnections, requestTimeoutMs, 
retryBackoffMs, electionTimeoutMs, electionBackoffMs,
-fetchTimeoutMs, appendLingerMs);
-}
-
-public static List voterNodesFromIds(Set voterIds,
+public static Map 
voterNodesFromIds(Set voterIds,

Review comment:
   Removed in favor of inlined code 
   https://github.com/apache/kafka/pull/9916#discussion_r561375612





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-20 Thread GitBox


aloknnikhil commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561375612



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##
@@ -16,36 +16,15 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.Node;
-
 import java.net.InetSocketAddress;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class RaftTestUtil {
-public static RaftConfig buildRaftConfig(
-int requestTimeoutMs,
-int retryBackoffMs,
-int electionTimeoutMs,
-int electionBackoffMs,
-int fetchTimeoutMs,
-int appendLingerMs,
-List voterNodes
-) {
-Map voterConnections = voterNodes.stream()
-.collect(Collectors.toMap(Node::id, node -> new 
InetSocketAddress(node.host(), node.port(;
-return new RaftConfig(voterConnections, requestTimeoutMs, 
retryBackoffMs, electionTimeoutMs, electionBackoffMs,
-fetchTimeoutMs, appendLingerMs);
-}
-
-public static List voterNodesFromIds(Set voterIds,
+public static Map 
voterNodesFromIds(Set voterIds,
Function voterAddressGenerator) {
-return voterIds.stream().map(voterId -> {
-InetSocketAddress voterAddress = 
voterAddressGenerator.apply(voterId);
-return new Node(voterId, voterAddress.getHostName(), 
voterAddress.getPort());
-}).collect(Collectors.toList());
+return voterIds.stream().collect(Collectors.toMap(id -> id, 
voterAddressGenerator));

Review comment:
   I'm up for removing any extra files when we don't need them. Aaand, 
it's gone.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class

2021-01-20 Thread GitBox


ableegoldman commented on a change in pull request #9940:
URL: https://github.com/apache/kafka/pull/9940#discussion_r561368977



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -2779,6 +2779,62 @@ public void suspend() {
 assertThat(task01.state(), is(Task.State.SUSPENDED));
 }
 
+@Test
+public void shouldConvertActiveTaskToStandbyTask() {
+final StreamTask activeTask = mock(StreamTask.class);
+expect(activeTask.id()).andReturn(taskId00).anyTimes();
+
expect(activeTask.inputPartitions()).andReturn(taskId00Partitions).anyTimes();

Review comment:
   nit:  use `andStubReturn` instead of `andReturn().anyTimes`. No need to 
change this now, don't want to block the fix, just fyi for future PRs 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10848) Allow fine grained control over cross-partition processing order

2021-01-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268915#comment-17268915
 ] 

Matthias J. Sax commented on KAFKA-10848:
-

Thanks [~guozhang] – the idea of this ticket was along the same lines as your 
proposal. How we design the interface would be something to discuss on the KIP. 
Samza for example uses a quite different interface: 
[https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html]
 – Don't know which API would be favorable.

> Allow fine grained control over cross-partition processing order
> 
>
> Key: KAFKA-10848
> URL: https://issues.apache.org/jira/browse/KAFKA-10848
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Currently, KafkaStreams implements a hard-coded timestamp based strategy to 
> pick the next record to process for a task, given that a task has multiple 
> partitions.
> In general, this strategy works well for the DSL, but for PAPI users, there 
> might be cases when the strategy should be customized. And even for the DSL, 
> there is one corner case (for a stream-table join) for which the table-side 
> record should be processed first if two records have the same timestamp (at 
> least, this gap exists as long as we don't have multi-version KTables), while 
> we cannot enforce this behavior because at runtime we don't know anything 
> about KStream vs KTable or an existing downstream join.
> Thus, we might want to allow users to plugin a custom strategy to pick the 
> next record for processing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-20 Thread GitBox


hachikuji commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561344761



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -750,9 +748,9 @@ void start(int nodeId) {
 PersistentState persistentState = nodes.get(nodeId);
 MockNetworkChannel channel = new 
MockNetworkChannel(correlationIdCounter);
 MockMessageQueue messageQueue = new MockMessageQueue();
-List voterNodes = voterNodesFromIds(voters, 
Cluster::nodeAddress);
-RaftConfig raftConfig = buildRaftConfig(REQUEST_TIMEOUT_MS, 
RETRY_BACKOFF_MS, ELECTION_TIMEOUT_MS,
-ELECTION_JITTER_MS, FETCH_TIMEOUT_MS, LINGER_MS, 
voterNodes);
+Map voterNodes = 
voterNodesFromIds(voters, Cluster::nodeAddress);

Review comment:
   nit: `voterNodes` -> `voterAddresses` or `voterAddressMap`?

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##
@@ -16,36 +16,15 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.Node;
-
 import java.net.InetSocketAddress;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class RaftTestUtil {
-public static RaftConfig buildRaftConfig(
-int requestTimeoutMs,
-int retryBackoffMs,
-int electionTimeoutMs,
-int electionBackoffMs,
-int fetchTimeoutMs,
-int appendLingerMs,
-List voterNodes
-) {
-Map voterConnections = voterNodes.stream()
-.collect(Collectors.toMap(Node::id, node -> new 
InetSocketAddress(node.host(), node.port(;
-return new RaftConfig(voterConnections, requestTimeoutMs, 
retryBackoffMs, electionTimeoutMs, electionBackoffMs,
-fetchTimeoutMs, appendLingerMs);
-}
-
-public static List voterNodesFromIds(Set voterIds,
+public static Map 
voterNodesFromIds(Set voterIds,

Review comment:
   nit: maybe `buildAddressMap` or something like that?

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##
@@ -16,36 +16,15 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.Node;
-
 import java.net.InetSocketAddress;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class RaftTestUtil {
-public static RaftConfig buildRaftConfig(
-int requestTimeoutMs,
-int retryBackoffMs,
-int electionTimeoutMs,
-int electionBackoffMs,
-int fetchTimeoutMs,
-int appendLingerMs,
-List voterNodes
-) {
-Map voterConnections = voterNodes.stream()
-.collect(Collectors.toMap(Node::id, node -> new 
InetSocketAddress(node.host(), node.port(;
-return new RaftConfig(voterConnections, requestTimeoutMs, 
retryBackoffMs, electionTimeoutMs, electionBackoffMs,
-fetchTimeoutMs, appendLingerMs);
-}
-
-public static List voterNodesFromIds(Set voterIds,
+public static Map 
voterNodesFromIds(Set voterIds,
Function voterAddressGenerator) {
-return voterIds.stream().map(voterId -> {
-InetSocketAddress voterAddress = 
voterAddressGenerator.apply(voterId);
-return new Node(voterId, voterAddress.getHostName(), 
voterAddress.getPort());
-}).collect(Collectors.toList());
+return voterIds.stream().collect(Collectors.toMap(id -> id, 
voterAddressGenerator));

Review comment:
   Guess we can keep it, but this helper doesn't seem to be doing much for 
us anymore.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-20 Thread GitBox


jolshan commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-764014519


   @dengziming I realized that for this, it really depends on the IBP of the 
controller. (That is, we need UpdateMetadata to send topic IDs to all the 
brokers). So maybe instead of checking IBP it would make sense to check if the 
MetadataCache does not have any topic IDs. What do you think?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12190) Failure on Windows due to an UnsupportedOperationException when StateDirectory sets file permissions

2021-01-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268914#comment-17268914
 ] 

Matthias J. Sax commented on KAFKA-12190:
-

\cc [~ijuma] [~guozhang] – what do you think? Should we do a `2.6.2` release 
quickly because of this bug?

> Failure on Windows due to an UnsupportedOperationException when 
> StateDirectory sets file permissions
> 
>
> Key: KAFKA-12190
> URL: https://issues.apache.org/jira/browse/KAFKA-12190
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1, 2.7.1
>Reporter: Andy Wilkinson
>Priority: Critical
>  Labels: bug
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> There appears to be a regression in Kafka 2.6.1 due to [the 
> changes|https://github.com/apache/kafka/pull/9583] made for KAFKA-10705 that 
> causes a failure on Windows. After upgrading to 2.6.1 from 2.6.0, we're 
> seeing failures in Spring Boot's CI on Windows such as the following:
> {noformat}
> Caused by: java.lang.UnsupportedOperationException: (No message provided)
> at java.nio.file.Files.setPosixFilePermissions(Files.java:2044)
> at 
> org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:115)
> 
> at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:745)
> at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:585)
> at 
> org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:316)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
> 
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
> 
> at 
> org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:940)
> 
> at 
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:591)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.configureContext(AbstractApplicationContextRunner.java:447)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAndLoadContext(AbstractApplicationContextRunner.java:423)
> 
> at 
> org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.getContextOrStartupFailure(AssertProviderApplicationContextInvocationHandler.java:61)
> 
> at 
> org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.(AssertProviderApplicationContextInvocationHandler.java:48)
> 
> at 
> org.springframework.boot.test.context.assertj.ApplicationContextAssertProvider.get(ApplicationContextAssertProvider.java:112)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAssertableContext(AbstractApplicationContextRunner.java:412)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$null$0(AbstractApplicationContextRunner.java:382)
> 
> at 
> org.springframework.boot.test.util.TestPropertyValues.applyToSystemProperties(TestPropertyValues.java:175)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$run$1(AbstractApplicationContextRunner.java:381)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.withContextClassLoader(AbstractApplicationContextRunner.java:392)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.run(AbstractApplicationContextRunner.java:381)
> 
> at 
> org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfigurationTests.whenKafkaStreamsIsEnabledAndThereIsNoMeterRegistryThenListenerCustomizationBacksOff(KafkaMetricsAutoConfigurationTests.java:92)
> {noformat}
> The same code worked without changes using Kafka 2.6.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key

2021-01-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268911#comment-17268911
 ] 

Matthias J. Sax commented on KAFKA-12213:
-

I am also happy to break into smaller junks... whatever we think works best or 
whatever [~MonCalamari] is interested in to work on.

> Kafka Streams aggregation Initializer to accept record key
> --
>
> Key: KAFKA-12213
> URL: https://issues.apache.org/jira/browse/KAFKA-12213
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Piotr Fras
>Assignee: Piotr Fras
>Priority: Minor
>  Labels: needs-kip
>
> Sometimes Kafka record key contains useful information for creating a zero 
> object in aggregation Initializer. This feature is to add kafka record key to 
> Initializer.
> There were two approaches I considered to implement this feature, one 
> respecting backwards compatibility for internal and external APIs and the 
> other one which is not. I chose the latter one as it was more strait-forward. 
> We may want to validate this approach tho.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2021-01-20 Thread Charlie Johnston (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268908#comment-17268908
 ] 

Charlie Johnston commented on KAFKA-9076:
-

Was this feature intended to only be one-directional or is this just a subset 
of the intended functionality? Using this with MM2 configuration and two active 
sites, I find that the site prefixed topics are not being included in any 
offset mirroring only the non-site prefixed (original) topics are being 
mirrored. Is it not common with MM2 use to have a scenario with consumers 
consuming from multiple topics (topic per site) to consume all sites? Apologies 
if there is any confusion or missing setting on my end, we dug through the code 
a bit and couldn't find any signs to suggest site prefixed topics are also 
mirrored. Thanks!

> MirrorMaker 2.0 automated consumer offset sync
> --
>
> Key: KAFKA-9076
> URL: https://issues.apache.org/jira/browse/KAFKA-9076
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: mirrormaker, pull-request-available
> Fix For: 2.7.0
>
>
> To calculate the translated consumer offset in the target cluster, currently 
> `Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
> used by "RemoteClusterUtils" for one-time purpose.
> In order to make the consumer and stream applications migrate from source to 
> target cluster transparently and conveniently, e.g. in event of source 
> cluster failure, a background job is proposed to periodically sync the 
> consumer offsets from the source to target cluster, so that when the consumer 
> and stream applications switch to the target cluster, it will resume to 
> consume from where it left off at source cluster.
>  KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0
> [https://github.com/apache/kafka/pull/7577]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10869) Gate topic IDs behind IBP 2.8

2021-01-20 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-10869.

Resolution: Fixed

> Gate topic IDs behind IBP 2.8
> -
>
> Key: KAFKA-10869
> URL: https://issues.apache.org/jira/browse/KAFKA-10869
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 2.8.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> We want to do this so we don't lose topic IDs upon downgrades. If we 
> downgrade and write to topic node in ZK, the topic ID will be lost. If the 
> broker is upgraded again later, the topic IDs may not match.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-20 Thread GitBox


rajinisivaram commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-763996884


   @dengziming https://github.com/apache/kafka/pull/9814 has been merged, so 
this needs rebasing and the check for IBP. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #9814: KAFKA-10869: Gate topic IDs behind IBP 2.8

2021-01-20 Thread GitBox


rajinisivaram merged pull request #9814:
URL: https://github.com/apache/kafka/pull/9814


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9814: KAFKA-10869: Gate topic IDs behind IBP 2.8

2021-01-20 Thread GitBox


rajinisivaram commented on pull request #9814:
URL: https://github.com/apache/kafka/pull/9814#issuecomment-763993768


   @jolshan Thanks for running the system tests, merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9835: MINOR: Add 'task container' class to KafkaStreams TaskManager

2021-01-20 Thread GitBox


mjsax commented on a change in pull request #9835:
URL: https://github.com/apache/kafka/pull/9835#discussion_r561342858



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+class Tasks {
+private final Logger log;
+private final InternalTopologyBuilder builder;
+private final StreamsMetricsImpl streamsMetrics;
+
+private final Map allTasksPerId = new TreeMap<>();
+private final Map readOnlyTasksPerId = 
Collections.unmodifiableMap(allTasksPerId);
+private final Collection readOnlyTasks = 
Collections.unmodifiableCollection(allTasksPerId.values());
+
+// TODO: change type to `StreamTask`
+private final Map activeTasksPerId = new TreeMap<>();
+// TODO: change type to `StreamTask`
+private final Map activeTasksPerPartition = new 
HashMap<>();
+// TODO: change type to `StreamTask`
+private final Map readOnlyActiveTasksPerId = 
Collections.unmodifiableMap(activeTasksPerId);
+private final Set readOnlyActiveTaskIds = 
Collections.unmodifiableSet(activeTasksPerId.keySet());
+// TODO: change type to `StreamTask`
+private final Collection readOnlyActiveTasks = 
Collections.unmodifiableCollection(activeTasksPerId.values());
+
+// TODO: change type to `StandbyTask`
+private final Map standbyTasksPerId = new TreeMap<>();
+// TODO: change type to `StandbyTask`
+private final Map readOnlyStandbyTasksPerId = 
Collections.unmodifiableMap(standbyTasksPerId);
+private final Set readOnlyStandbyTaskIds = 
Collections.unmodifiableSet(standbyTasksPerId.keySet());
+
+private final ActiveTaskCreator activeTaskCreator;
+private final StandbyTaskCreator standbyTaskCreator;
+
+private Consumer mainConsumer;
+
+Tasks(final String logPrefix,
+  final InternalTopologyBuilder builder,
+  final StreamsMetricsImpl streamsMetrics,
+  final ActiveTaskCreator activeTaskCreator,
+  final StandbyTaskCreator standbyTaskCreator) {
+
+final LogContext logContext = new LogContext(logPrefix);
+log = logContext.logger(getClass());
+
+this.builder = builder;
+this.streamsMetrics = streamsMetrics;
+this.activeTaskCreator = activeTaskCreator;
+this.standbyTaskCreator = standbyTaskCreator;
+}
+
+void setMainConsumer(final Consumer mainConsumer) {
+this.mainConsumer = mainConsumer;
+}
+
+void createTasks(final Map> 
activeTasksToCreate,
+ final Map> 
standbyTasksToCreate) {
+for (final Map.Entry> taskToBeCreated : 
activeTasksToCreate.entrySet()) {
+final TaskId taskId = taskToBeCreated.getKey();
+
+if (activeTasksPerId.containsKey(taskId)) {
+throw new IllegalStateException("Attempted to create an active 
task that we already own: " + taskId);
+}
+}
+
+for (final Map.Entry> taskToBeCreated : 
standbyTasksToCreate.entrySet()) {
+final TaskId taskId = taskToBeCreated.getKey();
+
+if (standbyTasksPerId.containsKey(taskId)) {
+throw new IllegalStateException("Attempted to create a standby 
task that we already own: " + taskId);
+}
+}
+
+// keep this check to simplify testing (ie, no need to mock 
`activeTaskCreator`)
+if (!activeTasksToCreate.isEmpty()) {
+// TODO: change type to `StreamTask`
+for (final Task activeTask : 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9835: MINOR: Add 'task container' class to KafkaStreams TaskManager

2021-01-20 Thread GitBox


ableegoldman commented on a change in pull request #9835:
URL: https://github.com/apache/kafka/pull/9835#discussion_r561340839



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+class Tasks {
+private final Logger log;
+private final InternalTopologyBuilder builder;
+private final StreamsMetricsImpl streamsMetrics;
+
+private final Map allTasksPerId = new TreeMap<>();
+private final Map readOnlyTasksPerId = 
Collections.unmodifiableMap(allTasksPerId);
+private final Collection readOnlyTasks = 
Collections.unmodifiableCollection(allTasksPerId.values());
+
+// TODO: change type to `StreamTask`
+private final Map activeTasksPerId = new TreeMap<>();
+// TODO: change type to `StreamTask`
+private final Map activeTasksPerPartition = new 
HashMap<>();
+// TODO: change type to `StreamTask`
+private final Map readOnlyActiveTasksPerId = 
Collections.unmodifiableMap(activeTasksPerId);
+private final Set readOnlyActiveTaskIds = 
Collections.unmodifiableSet(activeTasksPerId.keySet());
+// TODO: change type to `StreamTask`
+private final Collection readOnlyActiveTasks = 
Collections.unmodifiableCollection(activeTasksPerId.values());
+
+// TODO: change type to `StandbyTask`
+private final Map standbyTasksPerId = new TreeMap<>();
+// TODO: change type to `StandbyTask`
+private final Map readOnlyStandbyTasksPerId = 
Collections.unmodifiableMap(standbyTasksPerId);
+private final Set readOnlyStandbyTaskIds = 
Collections.unmodifiableSet(standbyTasksPerId.keySet());
+
+private final ActiveTaskCreator activeTaskCreator;
+private final StandbyTaskCreator standbyTaskCreator;
+
+private Consumer mainConsumer;
+
+Tasks(final String logPrefix,
+  final InternalTopologyBuilder builder,
+  final StreamsMetricsImpl streamsMetrics,
+  final ActiveTaskCreator activeTaskCreator,
+  final StandbyTaskCreator standbyTaskCreator) {
+
+final LogContext logContext = new LogContext(logPrefix);
+log = logContext.logger(getClass());
+
+this.builder = builder;
+this.streamsMetrics = streamsMetrics;
+this.activeTaskCreator = activeTaskCreator;
+this.standbyTaskCreator = standbyTaskCreator;
+}
+
+void setMainConsumer(final Consumer mainConsumer) {
+this.mainConsumer = mainConsumer;
+}
+
+void createTasks(final Map> 
activeTasksToCreate,
+ final Map> 
standbyTasksToCreate) {
+for (final Map.Entry> taskToBeCreated : 
activeTasksToCreate.entrySet()) {
+final TaskId taskId = taskToBeCreated.getKey();
+
+if (activeTasksPerId.containsKey(taskId)) {
+throw new IllegalStateException("Attempted to create an active 
task that we already own: " + taskId);
+}
+}
+
+for (final Map.Entry> taskToBeCreated : 
standbyTasksToCreate.entrySet()) {
+final TaskId taskId = taskToBeCreated.getKey();
+
+if (standbyTasksPerId.containsKey(taskId)) {
+throw new IllegalStateException("Attempted to create a standby 
task that we already own: " + taskId);
+}
+}
+
+// keep this check to simplify testing (ie, no need to mock 
`activeTaskCreator`)
+if (!activeTasksToCreate.isEmpty()) {
+// TODO: change type to `StreamTask`
+for (final Task activeTask : 

[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-20 Thread GitBox


aloknnikhil commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561334437



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java
##
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.raft;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.junit.jupiter.api.Test;
-
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Properties;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-public class RaftConfigTest {
-
-@Test
-public void testSingleQuorumVoterConnections() {
-Properties properties = new Properties();
-properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@127.0.0.1:9092");
-RaftConfig config = new RaftConfig(properties);
-assertEquals(Collections.singletonMap(1, new 
InetSocketAddress("127.0.0.1", 9092)),
-config.quorumVoterConnections());
-}
-
-@Test
-public void testMultiQuorumVoterConnections() {
-Properties properties = new Properties();
-properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@kafka1:9092,2@kafka2:9092,3@kafka3:9092");
-RaftConfig config = new RaftConfig(properties);
-
-HashMap expected = new HashMap<>();
-expected.put(1, new InetSocketAddress("kafka1", 9092));
-expected.put(2, new InetSocketAddress("kafka2", 9092));
-expected.put(3, new InetSocketAddress("kafka3", 9092));
-
-assertEquals(expected, config.quorumVoterConnections());

Review comment:
   Ah, good catch. Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-20 Thread GitBox


aloknnikhil commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561327703



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##
@@ -35,24 +35,10 @@ public static RaftConfig buildRaftConfig(
 int appendLingerMs,
 List voterNodes
 ) {
-Properties properties = new Properties();
-properties.put(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeoutMs);
-properties.put(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, 
retryBackoffMs);
-properties.put(RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, 
electionTimeoutMs);
-properties.put(RaftConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, 
electionBackoffMs);
-properties.put(RaftConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, 
fetchTimeoutMs);
-properties.put(RaftConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs);
-
-StringBuilder votersString = new StringBuilder();
-String prefix = "";
-for (Node voter : voterNodes) {
-votersString.append(prefix);
-
votersString.append(voter.id()).append('@').append(voter.host()).append(':').append(voter.port());
-prefix = ",";
-}
-properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
votersString.toString());
-
-return new RaftConfig(properties);
+Map voterConnections = voterNodes.stream()
+.collect(Collectors.toMap(Node::id, node -> new 
InetSocketAddress(node.host(), node.port(;
+return new RaftConfig(voterConnections, requestTimeoutMs, 
retryBackoffMs, electionTimeoutMs, electionBackoffMs,

Review comment:
   You're right. This is an artifact from the previous constructor usage. 
Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-20 Thread GitBox


aloknnikhil commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561322418



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1258,6 +1267,15 @@ object KafkaConfig {
   .define(PasswordEncoderCipherAlgorithmProp, STRING, 
Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc)
   .define(PasswordEncoderKeyLengthProp, INT, 
Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc)
   .define(PasswordEncoderIterationsProp, INT, 
Defaults.PasswordEncoderIterations, atLeast(1024), LOW, 
PasswordEncoderIterationsDoc)
+
+  /** * Raft Quorum Configuration */
+  .defineInternal(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, 
Defaults.QuorumVoters, new RaftConfig.ControllerQuorumVotersValidator(), HIGH)

Review comment:
   Ack. Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-20 Thread GitBox


aloknnikhil commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561315012



##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -941,4 +950,38 @@ class KafkaConfigTest {
 })
   }
 
+  @Test
+  def testInvalidQuorumVotersConfig(): Unit = {
+assertInvalidQuorumVoters("1")
+assertInvalidQuorumVoters("1@")
+assertInvalidQuorumVoters("1:")
+assertInvalidQuorumVoters("blah@")
+assertInvalidQuorumVoters("1@kafka1")
+assertInvalidQuorumVoters("1@kafka1:9092,")
+assertInvalidQuorumVoters("1@kafka1:9092,")
+assertInvalidQuorumVoters("1@kafka1:9092,2")
+assertInvalidQuorumVoters("1@kafka1:9092,2@")
+assertInvalidQuorumVoters("1@kafka1:9092,2@blah")
+assertInvalidQuorumVoters("1@kafka1:9092,2@blah,")
+  }
+
+  private def assertInvalidQuorumVoters(value: String): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
+  }
+
+  @Test
+  def testValidQuorumVotersConfig(): Unit = {
+assertValidQuorumVoters("", 0)
+assertValidQuorumVoters("1@127.0.0.1:9092", 1)
+assertValidQuorumVoters("1@kafka1:9092,2@kafka2:9092,3@kafka3:9092", 3)
+  }
+
+  private def assertValidQuorumVoters(value: String, expectedVoterCount: Int): 
Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+assertDoesNotThrow(() => KafkaConfig.fromProps(props))

Review comment:
   Makes sense. Removed.

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -236,4 +176,31 @@ private static Integer parseVoterId(String idString) {
 return voterMap;
 }
 
+public static class ControllerQuorumVotersValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null);
+}
+
+@SuppressWarnings("unchecked")
+List voterStrings = (List) value;
+
+if (voterStrings.size() == 0) {
+// TODO: Add a flag to skip validation for an empty voter 
string, conditionally.
+//   For now, skip anyway. See 
https://github.com/apache/kafka/pull/9916#discussion_r560611932

Review comment:
   Fair enough. Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde

2021-01-20 Thread GitBox


yeralin commented on pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-763952940


   @mjsax Hey, just rebased my branch with the trunk, updated my tests to use 
JUnit 5.
   Let me know when you guys will have time to review it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9934: MINOR: Drop enable.metadata.quorum config

2021-01-20 Thread GitBox


hachikuji commented on pull request #9934:
URL: https://github.com/apache/kafka/pull/9934#issuecomment-763923618


   I decided to leave the controller-only flag as it is. I think there are 
further improvements here to make the scope of the API clearer, but the 
implications for compatibility are subtle enough that we should consider it 
separately.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-20 Thread GitBox


hachikuji commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561283768



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java
##
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.raft;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.junit.jupiter.api.Test;
-
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Properties;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-public class RaftConfigTest {
-
-@Test
-public void testSingleQuorumVoterConnections() {
-Properties properties = new Properties();
-properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@127.0.0.1:9092");
-RaftConfig config = new RaftConfig(properties);
-assertEquals(Collections.singletonMap(1, new 
InetSocketAddress("127.0.0.1", 9092)),
-config.quorumVoterConnections());
-}
-
-@Test
-public void testMultiQuorumVoterConnections() {
-Properties properties = new Properties();
-properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@kafka1:9092,2@kafka2:9092,3@kafka3:9092");
-RaftConfig config = new RaftConfig(properties);
-
-HashMap expected = new HashMap<>();
-expected.put(1, new InetSocketAddress("kafka1", 9092));
-expected.put(2, new InetSocketAddress("kafka2", 9092));
-expected.put(3, new InetSocketAddress("kafka3", 9092));
-
-assertEquals(expected, config.quorumVoterConnections());

Review comment:
   This test case seems stronger than the one that was ported to 
`KafkaConfigTest`. It is validating the endpoints in addition to the number of 
voters. Is there any way we can recover this?

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -236,4 +176,31 @@ private static Integer parseVoterId(String idString) {
 return voterMap;
 }
 
+public static class ControllerQuorumVotersValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null);
+}
+
+@SuppressWarnings("unchecked")
+List voterStrings = (List) value;
+
+if (voterStrings.size() == 0) {
+// TODO: Add a flag to skip validation for an empty voter 
string, conditionally.
+//   For now, skip anyway. See 
https://github.com/apache/kafka/pull/9916#discussion_r560611932

Review comment:
   We typically do not leave TODOs in the code. We can file a jira if we 
think it's important to remember. I'd suggest we just leave this check out and 
skip the empty check below.

##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -941,4 +950,38 @@ class KafkaConfigTest {
 })
   }
 
+  @Test
+  def testInvalidQuorumVotersConfig(): Unit = {
+assertInvalidQuorumVoters("1")
+assertInvalidQuorumVoters("1@")
+assertInvalidQuorumVoters("1:")
+assertInvalidQuorumVoters("blah@")
+assertInvalidQuorumVoters("1@kafka1")
+assertInvalidQuorumVoters("1@kafka1:9092,")
+assertInvalidQuorumVoters("1@kafka1:9092,")
+assertInvalidQuorumVoters("1@kafka1:9092,2")
+assertInvalidQuorumVoters("1@kafka1:9092,2@")
+assertInvalidQuorumVoters("1@kafka1:9092,2@blah")
+assertInvalidQuorumVoters("1@kafka1:9092,2@blah,")
+  }
+
+  private def assertInvalidQuorumVoters(value: String): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
+  }
+
+  @Test
+  def testValidQuorumVotersConfig(): Unit = {
+assertValidQuorumVoters("", 0)
+assertValidQuorumVoters("1@127.0.0.1:9092", 1)
+assertValidQuorumVoters("1@kafka1:9092,2@kafka2:9092,3@kafka3:9092", 3)
+  }
+
+  private def assertValidQuorumVoters(value: String, 

[GitHub] [kafka] ijuma commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5

2021-01-20 Thread GitBox


ijuma commented on pull request #9926:
URL: https://github.com/apache/kafka/pull/9926#issuecomment-763893680


   @chia7712 What have you seen that implies that? This was certainly the case 
before, not sure if something changed at some point (I know because I've seen 
compiler errors that would not be possible without the merge).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class

2021-01-20 Thread GitBox


mjsax opened a new pull request #9940:
URL: https://github.com/apache/kafka/pull/9940


   Call for review @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9904: KAFKA-12211: don't change perm for base/state dir when no persistent store

2021-01-20 Thread GitBox


ableegoldman commented on pull request #9904:
URL: https://github.com/apache/kafka/pull/9904#issuecomment-763886188


   Also cherrypicked to 2.7



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9904: KAFKA-12211: don't change perm for base/state dir when no persistent store

2021-01-20 Thread GitBox


ableegoldman commented on pull request #9904:
URL: https://github.com/apache/kafka/pull/9904#issuecomment-763884833


   Thanks @showuon , merged to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #9904: KAFKA-12211: don't change perm for base/state dir when no persistent store

2021-01-20 Thread GitBox


ableegoldman merged pull request #9904:
URL: https://github.com/apache/kafka/pull/9904


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12185) Flaky Test StoreQueryIntegrationTest#shouldQueryAllStalePartitionStores

2021-01-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-12185:
---

Assignee: Matthias J. Sax

> Flaky Test StoreQueryIntegrationTest#shouldQueryAllStalePartitionStores
> ---
>
> Key: KAFKA-12185
> URL: https://issues.apache.org/jira/browse/KAFKA-12185
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> java.lang.AssertionError: Application did not reach a RUNNING state for 
> all streams instances. Non-running instances: 
> \{org.apache.kafka.streams.KafkaStreams@651720d3=NOT_RUNNING}
>   at org.junit.Assert.fail(Assert.java:89)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning(IntegrationTestUtils.java:892)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores(StoreQueryIntegrationTest.java:270)
>  
> {{https://github.com/apache/kafka/pull/9835/checks?check_run_id=139314}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5

2021-01-20 Thread GitBox


chia7712 commented on pull request #9926:
URL: https://github.com/apache/kafka/pull/9926#issuecomment-763874639


   > be aware that Jenkins merges trunk before running the PR tests
   
   @ijuma  it seems that jenkins does not merge trunk before running QA.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5

2021-01-20 Thread GitBox


chia7712 commented on pull request #9926:
URL: https://github.com/apache/kafka/pull/9926#issuecomment-763872582


   ```
   
org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect
   ```
   
   fixed flaky



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12185) Flaky Test StoreQueryIntegrationTest#shouldQueryAllStalePartitionStores

2021-01-20 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268805#comment-17268805
 ] 

John Roesler commented on KAFKA-12185:
--

I just saw this fail locally and happened to notice this in the logs:

 
{code:java}

[2021-01-20 13:03:55,375] ERROR stream-client 
[app-StoreQueryIntegrationTestshouldQueryAllStalePartitionStores-ba1cebd9-bccc-4d54-af8a-2f6a51200612]
 Encountered the following exception during processing and the registered 
exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut 
down now.  (org.apache.kafka.streams.KafkaStreams:469)
org.apache.kafka.common.KafkaException: User rebalance callback throws an error
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:451)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:367)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:882)
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:839)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:679)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:547)
Caused by: java.lang.RuntimeException: Unexpected failure to close 1 task(s) 
[[0_1]]. First unexpected exception (for task 0_1) follows.
at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:311)
at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1484)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
... 11 more
Caused by: java.util.ConcurrentModificationException
at 
java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1704)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at 
org.apache.kafka.streams.processor.internals.Tasks.convertActiveToStandby(Tasks.java:133)
at 
org.apache.kafka.streams.processor.internals.TaskManager.handleCloseAndRecycle(TaskManager.java:394)
at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:291)
... 14 more {code}

> Flaky Test StoreQueryIntegrationTest#shouldQueryAllStalePartitionStores
> ---
>
> Key: KAFKA-12185
> URL: https://issues.apache.org/jira/browse/KAFKA-12185
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> java.lang.AssertionError: Application did not reach a RUNNING state for 
> all streams instances. Non-running instances: 
> \{org.apache.kafka.streams.KafkaStreams@651720d3=NOT_RUNNING}
>   at org.junit.Assert.fail(Assert.java:89)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning(IntegrationTestUtils.java:892)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores(StoreQueryIntegrationTest.java:270)
>  
> {{https://github.com/apache/kafka/pull/9835/checks?check_run_id=139314}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration

2021-01-20 Thread Alexey Kashavkin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kashavkin updated KAFKA-12228:
-
Description: 
I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting 
up the broker with DigiCert SSL certificate. I used new options and I did 
everything like in example in 
[KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key].
 But I get the error:
{code:bash}
[2021-01-20 17:54:55,787] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.config.ConfigException: Invalid value 
javax.net.ssl.SSLHandshakeException: no cipher suites in common for 
configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings.
at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98)
at 
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
at kafka.network.Processor.(SocketServer.scala:790)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:415)
at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288)
at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251)
at kafka.network.SocketServer.startup(SocketServer.scala:125)
at kafka.server.KafkaServer.startup(KafkaServer.scala:303)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
{code}
Java is used:
{code:bash}
openjdk version "1.8.0_272"
OpenJDK Runtime Environment (build 1.8.0_272-b10)
OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode)
{code}
OS is Centos 7.8.2003

_openssl x509 -in certificate.pem -text :_
{code:java}
Certificate:
...
Signature Algorithm: ecdsa-with-SHA384
...
Subject Public Key Info:
Public Key Algorithm: id-ecPublicKey
Public-Key: (256 bit)
{code}

  was:
I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting 
up the broker with DigiCert SSL certificate. I used new options and I did 
everything like in example in 
[KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key].
 But I get the error:
{code:bash}
[2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)[2021-01-20 
17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. 
Prepare to shutdown 
(kafka.server.KafkaServer)org.apache.kafka.common.config.ConfigException: 
Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common 
for configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings. at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) 
at 
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
 at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
 at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
 at kafka.network.Processor.(SocketServer.scala:790) at 
kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288)
 at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) 
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254)
 at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
 at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at 
scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:920) at 

[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper

2021-01-20 Thread GitBox


hachikuji commented on a change in pull request #9912:
URL: https://github.com/apache/kafka/pull/9912#discussion_r561189636



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -227,37 +225,33 @@ class KafkaApisTest {
 
 authorizeResource(authorizer, operation, ResourceType.TOPIC, resourceName, 
AuthorizationResult.ALLOWED)
 
-val capturedResponse = expectNoThrottling()
-
 val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
 EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
   .andAnswer(() => {
 Map(configResource -> alterConfigHandler.apply())
   })
 
-EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer,
-  adminManager, controller)
-
 val configs = Map(
   configResource -> new AlterConfigsRequest.Config(
 Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
 val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
 
 val request = buildRequestWithEnvelope(alterConfigsRequest, 
fromPrivilegedListener = true)
+val capturedResponse = EasyMock.newCapture[AbstractResponse]()
+val capturedRequest = EasyMock.newCapture[RequestChannel.Request]()
 
-createKafkaApis(authorizer = Some(authorizer), enableForwarding = 
true).handle(request)
-
-val envelopeRequest = request.body[EnvelopeRequest]
-val response = readResponse(envelopeRequest, capturedResponse)
-  .asInstanceOf[EnvelopeResponse]
-
-assertEquals(Errors.NONE, response.error)

Review comment:
   There remains some awkwardness in the handling of envelope requests. 
Basically the flow is like this:
   
   1. KafkaApis.handle(envelope(alterConfigRequest))
   2. KafkaApis.handle(alterConfigRequest)
   3. RequestChannel.sendResponse(alterConfigResponse)
   4. Request.buildResponseSend() -> envelope(alterConfigResponse)
   
   Previously `KafkaApisTest` had to work by parsing the response send, so we 
had to unwrap the envelope. But now we get the direct call to `sendResponse` 
with the embedded `AbstractResponse` instance. So basically we do not hit step 
4 anymore.
   
   I think a nicer way to structure this which we can consider separately is to 
change `KafkaApis.handle` so that it takes a callback rather than assuming that 
responses are sent directly to the request channel. Then when we make the 
recursive call to `handle` after unwrapping the envelope, we can provide a 
callback which wraps the underlying response with the response envelope. 
Alternatively, we can have `KafkaApis.handle` return a `CompletableFuture`. The 
main point is that we allow for some custom behavior when the response is ready.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9895: KAFKA-9924: Add docs for RocksDB properties-based metrics

2021-01-20 Thread GitBox


ijuma commented on pull request #9895:
URL: https://github.com/apache/kafka/pull/9895#issuecomment-763849858


   Jenkins is the best place to look:
   
   ![Screenshot from 2021-01-20 
10-36-13](https://user-images.githubusercontent.com/24747/105219511-63ad4600-5b0b-11eb-8f06-b2832646025d.png)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9935: HOTFIX: fix RocksDBMetricsTest

2021-01-20 Thread GitBox


ableegoldman commented on pull request #9935:
URL: https://github.com/apache/kafka/pull/9935#issuecomment-763842306


   Thanks for the fix @chia7712 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9895: KAFKA-9924: Add docs for RocksDB properties-based metrics

2021-01-20 Thread GitBox


ableegoldman commented on pull request #9895:
URL: https://github.com/apache/kafka/pull/9895#issuecomment-763841228


   @ijuma I'm confused, I don't see any failures for the test that this broke 
(RocksDBMetricsTest) in the build on the last commit. I saw some test failures, 
but all unrelated. Am I looking in the wrong place? 
   (I followed the link on the "Apply suggestions from code review" commit 
build --> https://github.com/apache/kafka/runs/1722608276 )



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 opened a new pull request #9939: MINOR: fix @link tag in javadoc

2021-01-20 Thread GitBox


tang7526 opened a new pull request #9939:
URL: https://github.com/apache/kafka/pull/9939


   fix @link tag in javadoc
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper

2021-01-20 Thread GitBox


hachikuji commented on a change in pull request #9912:
URL: https://github.com/apache/kafka/pull/9912#discussion_r561175037



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -371,9 +372,44 @@ class RequestChannel(val queueSize: Int,
 requestQueue.put(request)
   }
 
-  /** Send a response back to the socket server to be sent over the network */
-  def sendResponse(response: RequestChannel.Response): Unit = {
+  def closeConnection(
+request: RequestChannel.Request,
+errorCounts: java.util.Map[Errors, Integer]
+  ): Unit = {
+// This case is used when the request handler has encountered an error, 
but the client
+// does not expect a response (e.g. when produce request has acks set to 0)
+updateErrorMetrics(request.header.apiKey, errorCounts.asScala)
+sendResponse(new RequestChannel.CloseConnectionResponse(request))
+  }
+
+  def sendResponse(
+request: RequestChannel.Request,
+response: AbstractResponse,
+onComplete: Option[Send => Unit]

Review comment:
   That's fair. I will check the scope of this change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration

2021-01-20 Thread Alexey Kashavkin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kashavkin updated KAFKA-12228:
-
Description: 
I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting 
up the broker with DigiCert SSL certificate. I used new options and I did 
everything like in example in 
[KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key].
 But I get the error:
{code:bash}
[2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)[2021-01-20 
17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. 
Prepare to shutdown 
(kafka.server.KafkaServer)org.apache.kafka.common.config.ConfigException: 
Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common 
for configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings. at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) 
at 
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
 at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
 at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
 at kafka.network.Processor.(SocketServer.scala:790) at 
kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288)
 at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) 
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254)
 at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
 at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at 
scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:920) at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251)
 at kafka.network.SocketServer.startup(SocketServer.scala:125) at 
kafka.server.KafkaServer.startup(KafkaServer.scala:303) at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at 
kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala)
{code}
Java is used:
{code:bash}
openjdk version "1.8.0_272"
OpenJDK Runtime Environment (build 1.8.0_272-b10)
OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode)
{code}
OS is Centos 7.8.2003

_openssl x509 -in certificate.pem -text :_
{code:java}
Certificate:
...
Signature Algorithm: ecdsa-with-SHA384
...
Subject Public Key Info:
Public Key Algorithm: id-ecPublicKey
Public-Key: (256 bit)
{code}

  was:
I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting 
up the broker with DigiCert SSL certificate. I used new options and I did 
everything like in example in 
[KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key].
 But I get the error:
{code:bash}
[2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)[2021-01-20 
17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. 
Prepare to shutdown 
(kafka.server.KafkaServer)org.apache.kafka.common.config.ConfigException: 
Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common 
for configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings. at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) 
at 
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
 at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
 at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
 at kafka.network.Processor.(SocketServer.scala:790) at 
kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288)
 at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) 
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254)
 at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
 at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at 
scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:920) at 

[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper

2021-01-20 Thread GitBox


hachikuji commented on a change in pull request #9912:
URL: https://github.com/apache/kafka/pull/9912#discussion_r561174514



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
##
@@ -175,4 +175,19 @@ public String clientId() {
 public int correlationId() {
 return header.correlationId();
 }
+
+@Override
+public String toString() {
+return "RequestContext(" +
+"header=" + header +
+", connectionId='" + connectionId + '\'' +
+", clientAddress=" + clientAddress +
+", principal=" + principal +
+", listenerName=" + listenerName +
+", securityProtocol=" + securityProtocol +
+", clientInformation=" + clientInformation +
+", fromPrivilegedListener=" + fromPrivilegedListener +
+", principalSerde=" + principalSerde +

Review comment:
   Yeah, I considered it, but didn't see anything in here that looked 
sensitive. Most of this information is already included in the request logging. 
The client address is not, but that seems ok. Including the principal serde is 
annoying, but doesn't seem like a problem.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12228) Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common for configuration

2021-01-20 Thread Alexey Kashavkin (Jira)
Alexey Kashavkin created KAFKA-12228:


 Summary: Invalid value javax.net.ssl.SSLHandshakeException: no 
cipher suites in common for configuration
 Key: KAFKA-12228
 URL: https://issues.apache.org/jira/browse/KAFKA-12228
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.7.0
Reporter: Alexey Kashavkin
 Attachments: kafka.log

I found that Kafka 2.7.0 supports PEM certificates and I decided to try setting 
up the broker with DigiCert SSL certificate. I used new options and I did 
everything like in example in 
[KIP-651|https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key].
 But I get the error:
{code:bash}
[2021-01-20 17:24:18,245] ERROR [KafkaServer id=0] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)[2021-01-20 
17:24:18,245] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. 
Prepare to shutdown 
(kafka.server.KafkaServer)org.apache.kafka.common.config.ConfigException: 
Invalid value javax.net.ssl.SSLHandshakeException: no cipher suites in common 
for configuration A client SSLEngine created with the provided settings can't 
connect to a server SSLEngine created with those settings. at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98) 
at 
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
 at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
 at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
 at kafka.network.Processor.(SocketServer.scala:790) at 
kafka.network.SocketServer.newProcessor(SocketServer.scala:415) at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:288)
 at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:287) 
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:254)
 at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
 at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at 
scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:920) at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251)
 at kafka.network.SocketServer.startup(SocketServer.scala:125) at 
kafka.server.KafkaServer.startup(KafkaServer.scala:303) at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at 
kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala)
{code}
Java is used:
{code:bash}
openjdk version "1.8.0_272"
OpenJDK Runtime Environment (build 1.8.0_272-b10)
OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode)
{code}
OS is Centos 7.8.2003



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #9934: MINOR: Drop enable.metadata.quorum config

2021-01-20 Thread GitBox


hachikuji commented on pull request #9934:
URL: https://github.com/apache/kafka/pull/9934#issuecomment-763832322


   @cmccabe Yeah, that's right. Let me see if I can improve this a little bit 
to make the intent clearer. We have some APIs which will be exposed by both the 
broker and the controller, so I think this should probably be a set rather than 
a flag.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9934: MINOR: Drop enable.metadata.quorum config

2021-01-20 Thread GitBox


cmccabe commented on pull request #9934:
URL: https://github.com/apache/kafka/pull/9934#issuecomment-763827474


   > @hachikuji Do we still need this PR since Colin already did one?
   
   That PR was for the KIP-500 branch, not for trunk.  It also didn't refactor 
the "disabled" APIs concept, which I think is useful.
   
   > Replace the notion of "disabled" APIs with "controller-only" APIs. We 
previously marked some APIs which were intended only for the KIP-500 as 
"disabled" so that they would not be unintentionally exposed
   
   That seems reasonable.  I guess the idea is that the broker will not include 
those APIs in its ApiVersionsResponse.  However, the controller always will 
(not that clients will talk directly to the controller anyway...)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest

2021-01-20 Thread GitBox


chia7712 commented on a change in pull request #9938:
URL: https://github.com/apache/kafka/pull/9938#discussion_r561156309



##
File path: 
core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
##
@@ -32,17 +32,55 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
 import scala.jdk.CollectionConverters._
 
+object DescribeAuthorizedOperationsTest {
+  val Group1 = "group1"

Review comment:
   Is ```GROUP_1" more better? it is global variable in this test.

##
File path: 
core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
##
@@ -86,54 +122,36 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
 closeSasl()
   }
 
-  val group1Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, 
group1, PatternType.LITERAL),
-new AccessControlEntry("User:" + 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
-
-  val group2Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, 
group2, PatternType.LITERAL),
-new AccessControlEntry("User:" + 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
-
-  val group3Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, 
group3, PatternType.LITERAL),
-new AccessControlEntry("User:" + 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE, 
AclPermissionType.ALLOW))
-
-  val clusterAllAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL),
-new AccessControlEntry("User:" + 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
-
-  val topic1Acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
topic1, PatternType.LITERAL),
-new AccessControlEntry("User:" + 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
-
-  val topic2All = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
topic2, PatternType.LITERAL),
-new AccessControlEntry("User:" + 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE, 
AclPermissionType.ALLOW))
-
-  def createConfig(): Properties = {
+  private def createConfig(): Properties = {
 val adminClientConfig = new Properties()
 adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList)
 adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "2")
 val securityProps: util.Map[Object, Object] =
   TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, 
clientSaslProperties)
-securityProps.forEach { (key, value) => 
adminClientConfig.put(key.asInstanceOf[String], value) }
+securityProps.forEach((key, value) => 
adminClientConfig.put(key.asInstanceOf[String], value))

Review comment:
   How about ```adminClientConfig.putAll(securityProps)```?

##
File path: 
core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
##
@@ -32,17 +32,55 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
 import scala.jdk.CollectionConverters._
 
+object DescribeAuthorizedOperationsTest {
+  val Group1 = "group1"
+  val Group2 = "group2"
+  val Group3 = "group3"
+  val Topic1 = "topic1"
+  val Topic2 = "topic2"
+
+  val Group1Acl = new AclBinding(
+new ResourcePattern(ResourceType.GROUP, Group1, PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
ALL, ALLOW))
+
+  val Group2Acl = new AclBinding(
+new ResourcePattern(ResourceType.GROUP, Group2, PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
DESCRIBE, ALLOW))
+
+  val Group3Acl = new AclBinding(
+new ResourcePattern(ResourceType.GROUP, Group3, PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
DELETE, ALLOW))
+
+  val ClusterAllAcl = new AclBinding(
+new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, 
PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
ALL, ALLOW))
+
+  val Topic1Acl = new AclBinding(
+new ResourcePattern(ResourceType.TOPIC, Topic1, PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
ALL, ALLOW))
+
+  val Topic2All = new AclBinding(
+new ResourcePattern(ResourceType.TOPIC, Topic2, PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
DELETE, ALLOW))
+
+  private def accessControlEntry(
+userName: String,
+operation: AclOperation,
+permissionType: AclPermissionType
+  ): AccessControlEntry = {
+new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
userName).toString,
+  AclEntry.WildcardHost, operation, permissionType)
+  }
+}
+
 class 

[GitHub] [kafka] hachikuji commented on a change in pull request #9934: MINOR: Drop enable.metadata.quorum config

2021-01-20 Thread GitBox


hachikuji commented on a change in pull request #9934:
URL: https://github.com/apache/kafka/pull/9934#discussion_r561157384



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -638,6 +637,9 @@ object KafkaConfig {
   val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
   val ConnectionSetupTimeoutMsDoc = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC
   val ConnectionSetupTimeoutMaxMsDoc = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC
+  val ProcessRolesDoc = "[ALPHA] The roles that this process plays: 'broker', 
'controller', or 'broker,controller' if it is both. " +

Review comment:
   I think I'm going to keep this as internal for now. Let's reconsider 
this after we have something that can actually start up.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12227) Add method "Producer#produce" to return CompletionStage instead of Future

2021-01-20 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-12227:
---
Summary:  Add method "Producer#produce" to return CompletionStage instead 
of Future  (was: add new method "Producer#produce" which can return 
CompletionStage instead of Future)

>  Add method "Producer#produce" to return CompletionStage instead of Future
> --
>
> Key: KAFKA-12227
> URL: https://issues.apache.org/jira/browse/KAFKA-12227
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> Producer and KafkaProducer return a java.util.concurrent.Future from their 
> send methods. This makes it challenging to write asynchronous non-blocking 
> code given Future's limited interface. Since Kafka now requires Java 8, we 
> now have the option of using CompletionStage and/or CompletableFuture that 
> were introduced to solve this issue. It's worth noting that the Kafka 
> AdminClient solved this issue by using org.apache.kafka.common.KafkaFuture as 
> Java 7 support was still required then.
>  
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-XXX%3A+Return+CompletableFuture+from+KafkaProducer.send



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12227) add new method "Producer#produce" which can return CompletionStage instead of Future

2021-01-20 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-12227:
---
Summary: add new method "Producer#produce" which can return CompletionStage 
instead of Future  (was: Return CompletableFuture from KafkaProducer.send)

> add new method "Producer#produce" which can return CompletionStage instead of 
> Future
> 
>
> Key: KAFKA-12227
> URL: https://issues.apache.org/jira/browse/KAFKA-12227
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> Producer and KafkaProducer return a java.util.concurrent.Future from their 
> send methods. This makes it challenging to write asynchronous non-blocking 
> code given Future's limited interface. Since Kafka now requires Java 8, we 
> now have the option of using CompletionStage and/or CompletableFuture that 
> were introduced to solve this issue. It's worth noting that the Kafka 
> AdminClient solved this issue by using org.apache.kafka.common.KafkaFuture as 
> Java 7 support was still required then.
>  
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-XXX%3A+Return+CompletableFuture+from+KafkaProducer.send



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac opened a new pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest

2021-01-20 Thread GitBox


dajac opened a new pull request #9938:
URL: https://github.com/apache/kafka/pull/9938


   I was looking at `DescribeAuthorizedOperationsTest` in the context of 
KIP-700. I have made few changes to improve the readability of the code/tests. 
The PR only moves code around.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12210) AdminClient should use DescribeCluster API when available

2021-01-20 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-12210.
-
Fix Version/s: 2.8.0
   Resolution: Fixed

> AdminClient should use DescribeCluster API when available
> -
>
> Key: KAFKA-12210
> URL: https://issues.apache.org/jira/browse/KAFKA-12210
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >