This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 45383f7 KAFKA-10022:console-producer supports the setting of client.id (#8698) 45383f7 is described below commit 45383f75b3b343ac2695f0722b794947ee37098c Author: 阿洋 <xinzhuxiansh...@126.com> AuthorDate: Mon May 25 08:10:43 2020 +0800 KAFKA-10022:console-producer supports the setting of client.id (#8698) "console-producer" supports the setting of "client.id", which is a reasonable requirement, and the way "console consumer" and "console producer" handle "client.id" can be unified. "client.id" defaults to "console-producer" Co-authored-by: xinzhuxiansheng <xinzhuxiansh...@autohome.com.cn> Reviewers: Guozhang Wang <wangg...@gmail.com> --- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 5 +++-- .../scala/unit/kafka/tools/ConsoleProducerTest.scala | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 06bef80..04c6818 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -86,13 +86,14 @@ object ConsoleProducer { props ++= config.extraProducerProps - if(config.bootstrapServer != null) + if (config.bootstrapServer != null) props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) else props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") + if (props.getProperty(ProducerConfig.CLIENT_ID_CONFIG) == null) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala index ef7b6c6..a636c32 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala @@ -59,6 +59,14 @@ class ConsoleProducerTest { "--topic", "t3", ) + val clientIdOverride: Array[String] = Array( + "--broker-list", + "localhost:1001", + "--topic", + "t3", + "--producer-property", + "client.id=producer-1" + ) @Test def testValidConfigsBrokerList(): Unit = { @@ -102,4 +110,12 @@ class ConsoleProducerTest { assertEquals(util.Arrays.asList("localhost:1002"), producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) } + + @Test + def testClientIdOverride(): Unit = { + val config = new ConsoleProducer.ProducerConfig(clientIdOverride) + val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config)) + assertEquals("producer-1", + producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)) + } }