Expand embedded cluster at the Integration test
Hello, I need to check case with kafka cluster expansion at the integration test. I see the EmbeddedKafkaCluster and IntegrationTestHarness are popular to use at he integration tests. But these test classes don't support test cluster expansion. What do you suggest to use in this case? Does make sense create issue & PR to add cluster expansion function at the EmbeddedKafkaCluster or IntegrationTestHarness? May be I miss something and a cluster expansion functionality not specifically added to the classes? -- With best regards, Taras Ledkov
perf test on kafka and mirror2
Hi Kafka Experts I recently worked on kafka perf test, my kafka active/standby clusters and mirrormake2 are all built with sasl-ssl, kafka-producer-perf-test.sh setup with sasl-ssl too. I use kafka default settings, say, message.max.size, batch size...no changes... Test result shows active kafka cluster(5 brokers(cpu 16GHZ memory 32GB) located in Santa Clara) throughout is ~65MB/s for 1million records each on 10KB size, is it looks normal? However, the mirror-make2 is slow for mirroring message from active cluster to standby one(standby cluster located in another city named Wenatchee), it took 10minutes for mirror-maker2 to finish all records mirroring...so this looks not normal, so my questions are: do we have docs to tune kafka performance with sasl-ssl enabled, also do we have docs to tune kafka mirror-maker2 with sasl-ssl enabled? thanks. orker@devks-ca-dev:~/kafka$ ./kafka_2.13-3.2.0/bin/kafka-producer-perf-test.sh --topic vks-perf-tst-10240-mirror --throughput -1 --num-records 100 --record-size 10240 --producer-props acks=all --producer.config ./perf-test.properties 31480 records sent, 6296.0 records/sec (61.48 MB/sec), 217.0 ms avg latency, 666.0 ms max latency. 34315 records sent, 6863.0 records/sec (67.02 MB/sec), 285.4 ms avg latency, 909.0 ms max latency. 33878 records sent, 6775.6 records/sec (66.17 MB/sec), 300.8 ms avg latency, 1238.0 ms max latency. 34549 records sent, 6909.8 records/sec (67.48 MB/sec), 294.6 ms avg latency, 1119.0 ms max latency. 34386 records sent, 6877.2 records/sec (67.16 MB/sec), 298.0 ms avg latency, 1084.0 ms max latency. 33708 records sent, 6741.6 records/sec (65.84 MB/sec), 310.8 ms avg latency, 1039.0 ms max latency. 35489 records sent, 7097.8 records/sec (69.31 MB/sec), 284.0 ms avg latency, 834.0 ms max latency. 22823 records sent, 4564.6 records/sec (44.58 MB/sec), 386.2 ms avg latency, 2061.0 ms max latency. 14733 records sent, 2946.6 records/sec (28.78 MB/sec), 788.3 ms avg latency, 4592.0 ms max latency. 36586 records sent, 7317.2 records/sec (71.46 MB/sec), 276.0 ms avg latency, 1095.0 ms max latency. 36567 records sent, 7313.4 records/sec (71.42 MB/sec), 277.3 ms avg latency, 1131.0 ms max latency. 35723 records sent, 7144.6 records/sec (69.77 MB/sec), 291.7 ms avg latency, 1103.0 ms max latency. 35567 records sent, 7113.4 records/sec (69.47 MB/sec), 286.6 ms avg latency, 863.0 ms max latency. 34606 records sent, 6921.2 records/sec (67.59 MB/sec), 296.0 ms avg latency, 1014.0 ms max latency. 31265 records sent, 6253.0 records/sec (61.06 MB/sec), 325.6 ms avg latency, 1254.0 ms max latency. 32985 records sent, 6585.1 records/sec (64.31 MB/sec), 312.3 ms avg latency, 1198.0 ms max latency. 33024 records sent, 6604.8 records/sec (64.50 MB/sec), 310.9 ms avg latency, 1106.0 ms max latency. 36564 records sent, 7312.8 records/sec (71.41 MB/sec), 274.9 ms avg latency, 854.0 ms max latency. 34587 records sent, 6917.4 records/sec (67.55 MB/sec), 289.8 ms avg latency, 1351.0 ms max latency. 35665 records sent, 7133.0 records/sec (69.66 MB/sec), 283.9 ms avg latency, 1124.0 ms max latency. 35486 records sent, 7097.2 records/sec (69.31 MB/sec), 293.5 ms avg latency, 1157.0 ms max latency. 34965 records sent, 6993.0 records/sec (68.29 MB/sec), 291.4 ms avg latency, 1152.0 ms max latency. 30519 records sent, 6103.8 records/sec (59.61 MB/sec), 326.1 ms avg latency, 1318.0 ms max latency. 34269 records sent, 6853.8 records/sec (66.93 MB/sec), 312.0 ms avg latency, 1346.0 ms max latency. 37176 records sent, 7435.2 records/sec (72.61 MB/sec), 270.6 ms avg latency, 986.0 ms max latency. 35495 records sent, 7099.0 records/sec (69.33 MB/sec), 287.2 ms avg latency, 1084.0 ms max latency. 36220 records sent, 7244.0 records/sec (70.74 MB/sec), 281.1 ms avg latency, 1057.0 ms max latency. 35634 records sent, 7126.8 records/sec (69.60 MB/sec), 286.0 ms avg latency, 1005.0 ms max latency. 35453 records sent, 7090.6 records/sec (69.24 MB/sec), 290.0 ms avg latency, 985.0 ms max latency. 100 records sent, 6710.913959 records/sec (65.54 MB/sec), 300.38 ms avg latency, 4592.00 ms max latency, 134 ms 50th, 966 ms 95th, 1112 ms 99th, 3323 ms 99.9th. worker@devks-ca-dev:~/kafka$
Re: Performance test latency
Dear Kafka Community, Pease, don't support rossian terrorism. Supporting central bank of russia <https://www.google.com/search?sxsrf=ALiCzsaDe4dFArMyyR2x5MfJoLb5ssEvIw:1658485585862&q=Central+Bank+of+Russia&stick=H4sIAOPgE-LUz9U3SM8pKShXAjONLYzyCrTUs5Ot9JNKizPzUouL4Yz4_ILUosSSzPw8q7T80ryU1KJFrGLOqXklRYk5Ck6JedkK-WkKQaXFxZmJO1gZd7EzcTACAL98COdi&sa=X&ved=2ahUKEwiKjdehpIz5AhVXGDQIHTLTBSUQmxMoAXoECFcQAw> /ministry of finance of the russian federation <https://www.google.com/search?sxsrf=ALiCzsaDe4dFArMyyR2x5MfJoLb5ssEvIw:1658485585862&q=Ministry+of+Finance+of+the+Russian+Federation&stick=H4sIAOPgE-LUz9U3SM8pKShX4gIxLYuKzbMNtdSzk630k0qLM_NSi4vhjPj8gtSixJLM_DyrtPzSvJTUokWsur6ZeZnFJUWVCvlpCm6ZeYl5yakgZklGqkJQaXFxZmKegltqClTfDlbGXexMHIwAhtJqYno&sa=X&ved=2ahUKEwiKjdehpIz5AhVXGDQIHTLTBSUQmxMoAnoECFcQBA> and any rossian organization, you support occupants in Ukraine. On Fri, 22 Jul 2022 at 13:13, Ivanov, Evgeny wrote: > Hi everyone, > > I ran several performance test below and the overall throughput looks > good, but I don't understand why it shows huge latency (about 2 seconds). > Could you please explain what that latency means ? > It can't be the latency between records or batches. What it is then ? > > kafka-producer-perf-test.sh --producer-props > bootstrap.servers=server1:9095 --producer.config > kafka/client/consumer.properties --topic TopicC3part --throughput -1 > --record-size 1000 --num-records 100 --print-metrics > 53137 records sent, 10625.3 records/sec (10.13 MB/sec), 1501.9 ms avg > latency, 2384.0 ms max latency. > 87152 records sent, 17430.4 records/sec (16.62 MB/sec), 1908.7 ms avg > latency, 2365.0 ms max latency. > 75728 records sent, 15142.6 records/sec (14.44 MB/sec), 2120.1 ms avg > latency, 2878.0 ms max latency. > 78736 records sent, 15747.2 records/sec (15.02 MB/sec), 2064.6 ms avg > latency, 2826.0 ms max latency. > 94688 records sent, 18933.8 records/sec (18.06 MB/sec), 1717.0 ms avg > latency, 2792.0 ms max latency. > 84757 records sent, 16951.4 records/sec (16.17 MB/sec), 1887.1 ms avg > latency, 2941.0 ms max latency. > 70871 records sent, 14174.2 records/sec (13.52 MB/sec), 2334.6 ms avg > latency, 2783.0 ms max latency. > 67582 records sent, 13513.7 records/sec (12.89 MB/sec), 2365.0 ms avg > latency, 2807.0 ms max latency. > 68025 records sent, 13605.0 records/sec (12.97 MB/sec), 2465.0 ms avg > latency, 3028.0 ms max latency. > 72987 records sent, 14594.5 records/sec (13.92 MB/sec), 2282.5 ms avg > latency, 2657.0 ms max latency. > 73492 records sent, 14695.5 records/sec (14.01 MB/sec), 2234.3 ms avg > latency, 2521.0 ms max latency. > 73798 records sent, 14759.6 records/sec (14.08 MB/sec), 2213.1 ms avg > latency, 2695.0 ms max latency. > 74474 records sent, 14894.8 records/sec (14.20 MB/sec), 2213.8 ms avg > latency, 2446.0 ms max latency. > 100 records sent, 15003.976054 records/sec (14.31 MB/sec), 2098.53 ms > avg latency, 3028.00 ms max latency, 2371 ms 50th, 2788 ms 95th, 2949 ms > 99th, 3014 ms 99.9th. > > Best regards, > Evgeny > > > > > This email message (and any attachments) is confidential and may be > privileged or otherwise protected from disclosure by applicable law. If you > are not the intended recipient or have received this in error please notify > the system manager, postmas...@vtbcapital.ru and remove this message and > any attachments from your system. Any unauthorized dissemination, copying > or other use of this message and/or any attachments is strictly prohibited > and may constitute a breach of civil or criminal law. > JSC VTB Capital may monitor email traffic data and also the content of > email. > -- With best Regards, *Yurii Demchenko* *email:* yurii.demche...@gmail.com
Performance test latency
Hi everyone, I ran several performance test below and the overall throughput looks good, but I don't understand why it shows huge latency (about 2 seconds). Could you please explain what that latency means ? It can't be the latency between records or batches. What it is then ? kafka-producer-perf-test.sh --producer-props bootstrap.servers=server1:9095 --producer.config kafka/client/consumer.properties --topic TopicC3part --throughput -1 --record-size 1000 --num-records 100 --print-metrics 53137 records sent, 10625.3 records/sec (10.13 MB/sec), 1501.9 ms avg latency, 2384.0 ms max latency. 87152 records sent, 17430.4 records/sec (16.62 MB/sec), 1908.7 ms avg latency, 2365.0 ms max latency. 75728 records sent, 15142.6 records/sec (14.44 MB/sec), 2120.1 ms avg latency, 2878.0 ms max latency. 78736 records sent, 15747.2 records/sec (15.02 MB/sec), 2064.6 ms avg latency, 2826.0 ms max latency. 94688 records sent, 18933.8 records/sec (18.06 MB/sec), 1717.0 ms avg latency, 2792.0 ms max latency. 84757 records sent, 16951.4 records/sec (16.17 MB/sec), 1887.1 ms avg latency, 2941.0 ms max latency. 70871 records sent, 14174.2 records/sec (13.52 MB/sec), 2334.6 ms avg latency, 2783.0 ms max latency. 67582 records sent, 13513.7 records/sec (12.89 MB/sec), 2365.0 ms avg latency, 2807.0 ms max latency. 68025 records sent, 13605.0 records/sec (12.97 MB/sec), 2465.0 ms avg latency, 3028.0 ms max latency. 72987 records sent, 14594.5 records/sec (13.92 MB/sec), 2282.5 ms avg latency, 2657.0 ms max latency. 73492 records sent, 14695.5 records/sec (14.01 MB/sec), 2234.3 ms avg latency, 2521.0 ms max latency. 73798 records sent, 14759.6 records/sec (14.08 MB/sec), 2213.1 ms avg latency, 2695.0 ms max latency. 74474 records sent, 14894.8 records/sec (14.20 MB/sec), 2213.8 ms avg latency, 2446.0 ms max latency. 100 records sent, 15003.976054 records/sec (14.31 MB/sec), 2098.53 ms avg latency, 3028.00 ms max latency, 2371 ms 50th, 2788 ms 95th, 2949 ms 99th, 3014 ms 99.9th. Best regards, Evgeny This email message (and any attachments) is confidential and may be privileged or otherwise protected from disclosure by applicable law. If you are not the intended recipient or have received this in error please notify the system manager, postmas...@vtbcapital.ru and remove this message and any attachments from your system. Any unauthorized dissemination, copying or other use of this message and/or any attachments is strictly prohibited and may constitute a breach of civil or criminal law. JSC VTB Capital may monitor email traffic data and also the content of email.
Reset Kafka TestContainers after each junit test without destroying container
Cross-posting from stackoverflow <https://stackoverflow.com/questions/72420966/reset-kafka-testcontainers-without-clearing-and-recreating-the-testcontainer-aft> <https://stackoverflow.com/posts/72420966/timeline> I am using kafka testcontainers with JUnit5. Can someone let me know how can I delete data from Kafka testcontainers after each test so that I don't have to destroy and recreate the kafka testcontainer every time. - Test Container Version - 1.6.2 - Docker Kafka Image Name - confluentinc/cp-kafka:5.2.1
Re: [External] : Re: Unit Test falling for Kafka 2.8.0
Hi Mohammad, Sorry, I have no idea why it failed in your env. I can run ` ./gradlew :spotlessScalaCheck` successfully without error. That is, I can confirm the source code has no errors. You might need to google for the fix to your development environment. Thank you. Luke On Mon, Dec 27, 2021 at 9:34 PM mohammad shadab < mohammad.s.sha...@oracle.com> wrote: > Hi Luke, > > Thanks a lot for your time and assistance. > > Some how its wasn't attached, attaching again Kafka 3.0 unit test report. > > For my work i need to generate binaries from kafka source code which i am > doing locally , so there is no PR in repo. > I am just running unit test( i think that should run fine as it ran for me > earlier). > > if i run complete test or integration its not running at all , that i am > not considering as of now. > > below is error for kafka 2.8.0 ( same is for kafka 3.0) > === > > > > gradle test > > Starting a Gradle Daemon (subsequent builds will be faster) > > > > Starting Daemon> IDLE<-> 0% INITIALIZING [65ms]> Evaluating > settings<-> 0% INITIALIZING [170ms]<-> 0% > INITIALIZING [276ms] > > > .. > > > > > =--> 54% EXECUTING [1m 5s]<===--> 54% EXECUTING [1m > 6s]<===--> 54% EXECUTING [1m 7s]<===--> 54% EXECUTING [1m > 8s]<===--> 54% EXECUTING [1m 9s]<===--> 54% EXECUTING [1m > 10s]<===--> 54% EXECUTING [1m 11s] > FAILURE: Build failed with an exception. > > * What went wrong: > Execution failed for task ':spotlessScalaCheck'. > > The following files had format violations: > > streams\streams-scala\src\main\scala\org\apache\kafka\streams\scala\FunctionConversions.scala > @@ -1,83 +1,83 @@ > -/*\n > - * Licensed to the Apache Software Foundation (ASF) under one > or more\n > - * contributor license agreements. See the NOTICE file > distributed with\n > - * this work for additional information regarding copyright > ownership.\n > - * The ASF licenses this file to You under the Apache License, > Version 2.0\n > - * (the "License"); you may not use this file except in > compliance with\n > - * the License. You may obtain a copy of the License at\n > - *\n > - *http://www.apache.org/licenses/LICENSE-2.0\n > - *\n > - * Unless required by applicable law or agreed to in writing, > software\n > - * distributed under the License is distributed on an "AS IS" > BASIS,\n > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express > or implied.\n > - * See the License for the specific language governing > permissions and\n > - * limitations under the License.\n > - */\n > -package org.apache.kafka.streams.scala\n > -\n > -import org.apache.kafka.streams.KeyValue\n > -import org.apache.kafka.streams.kstream._\n > -import scala.jdk.CollectionConverters._\n > -import java.lang.{Iterable => JIterable}\n > -\n > -@deprecated("This object is for internal use only", since = > "2.1.0")\n > -object FunctionConversions {\n > -\n > - implicit private[scala] class ForeachActionFromFunction[K, > V](val p: (K, V) => Unit) extends AnyVal {\n > -def asForeachAction: ForeachAction[K, V] = (key, value) => > p(key, value)\n > - }\n > -\n > - implicit class PredicateFromFunction[K, V](val p: (K, V) => > Boolean) extends AnyVal {\n > -def asPredicate: Predicate[K, V] = (key: K, value: V) => > p(key, value)\n > - }\n > -\n > - implicit class MapperFromFunction[T, U, VR](val f: (T, U) => > VR) extends AnyVal {\n > -def asKeyValueMapper: KeyValueMapper[T, U, VR] = (key: T, > value: U) => f(key, value)\n > -def asValueJoiner: ValueJoiner[T, U, VR] = (value1: T, > value2: U) => f(value1, value2)\n > - }\n > -\n > - implicit class KeyValueMapperFromFunction[K, V, KR, VR](val > f: (K, V) => (KR, VR)) extends AnyVal {\n > -def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, > VR]] = (key: K, value: V) => {\n > - val (kr, vr) = f(key, value)\n > - KeyValue.pair(kr, vr)\n > -}\n > -
Re: [External] : Re: Unit Test falling for Kafka 2.8.0
Hi Luke, Thanks a lot for your time and assistance. Some how its wasn't attached, attaching again Kafka 3.0 unit test report. For my work i need to generate binaries from kafka source code which i am doing locally , so there is no PR in repo. I am just running unit test( i think that should run fine as it ran for me earlier). if i run complete test or integration its not running at all , that i am not considering as of now. below is error for kafka 2.8.0 ( same is for kafka 3.0) === gradle test Starting a Gradle Daemon (subsequent builds will be faster) > Starting Daemon> IDLE<-> 0% INITIALIZING [65ms]> Evaluating > settings<-> 0% INITIALIZING [170ms]<-> 0% > INITIALIZING [276ms] .. =--> 54% EXECUTING [1m 5s]<===--> 54% EXECUTING [1m 6s]<===--> 54% EXECUTING [1m 7s]<===--> 54% EXECUTING [1m 8s]<===--> 54% EXECUTING [1m 9s]<===--> 54% EXECUTING [1m 10s]<===--> 54% EXECUTING [1m 11s] FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':spotlessScalaCheck'. > The following files had format violations: streams\streams-scala\src\main\scala\org\apache\kafka\streams\scala\FunctionConversions.scala @@ -1,83 +1,83 @@ -/*\n - * Licensed to the Apache Software Foundation (ASF) under one or more\n - * contributor license agreements. See the NOTICE file distributed with\n - * this work for additional information regarding copyright ownership.\n - * The ASF licenses this file to You under the Apache License, Version 2.0\n - * (the "License"); you may not use this file except in compliance with\n - * the License. You may obtain a copy of the License at\n - *\n - *http://www.apache.org/licenses/LICENSE-2.0\n - *\n - * Unless required by applicable law or agreed to in writing, software\n - * distributed under the License is distributed on an "AS IS" BASIS,\n - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n - * See the License for the specific language governing permissions and\n - * limitations under the License.\n - */\n -package org.apache.kafka.streams.scala\n -\n -import org.apache.kafka.streams.KeyValue\n -import org.apache.kafka.streams.kstream._\n -import scala.jdk.CollectionConverters._\n -import java.lang.{Iterable => JIterable}\n -\n -@deprecated("This object is for internal use only", since = "2.1.0")\n -object FunctionConversions {\n -\n - implicit private[scala] class ForeachActionFromFunction[K, V](val p: (K, V) => Unit) extends AnyVal {\n -def asForeachAction: ForeachAction[K, V] = (key, value) => p(key, value)\n - }\n -\n - implicit class PredicateFromFunction[K, V](val p: (K, V) => Boolean) extends AnyVal {\n -def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key, value)\n - }\n -\n - implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal {\n -def asKeyValueMapper: KeyValueMapper[T, U, VR] = (key: T, value: U) => f(key, value)\n -def asValueJoiner: ValueJoiner[T, U, VR] = (value1: T, value2: U) => f(value1, value2)\n - }\n -\n - implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {\n -def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = (key: K, value: V) => {\n - val (kr, vr) = f(key, value)\n - KeyValue.pair(kr, vr)\n -}\n - }\n -\n - implicit class ValueMapperFromFunction[V, VR](val f: V => VR) extends AnyVal {\n -def asValueMapper: ValueMapper[V, VR] = (value: V) => f(value)\n ... (118 more lines that didn't fit) Violations also present in 43 other files. Run 'gradlew.bat :spotlessApply' to fix these violations. * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights. * Get more help at https://help.gradle.org Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0. Use '--warning-mode all' to show the individual deprecation warnings. See https://docs.gradle.org/6.8.3/userguide/command_line_interface.html#sec:command_line_warnings BUILD FAILED in 1m 13s
Re: [External] : Re: Unit Test falling for Kafka 2.8.0
Hi Mohammad, I can't see your report. But usually we won't run all tests on local environment. If you want to submit PR to Kafka repo, there will be jenkins build/tests for you. And also, there are some flaky tests. You can check the jenkins build results for v3.0 here: https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.0/ Thank you. Luke On Fri, Dec 24, 2021 at 9:26 PM mohammad shadab < mohammad.s.sha...@oracle.com> wrote: > Thanks a lot Luke for this piece of information. > > Yesterday i downloaded kafka 3.0 and its even falling there. > Do i need to take it from git as i tool from > https://kafka.apache.org/downloads > <https://urldefense.com/v3/__https://kafka.apache.org/downloads__;!!ACWV5N9M2RV99hQ!e8QWe1ktCCKG2r0b2uBU9ydXmVOXfq-7ERFXY0M6KKBWBvdg5Az6ZPZjUZ47_TgqfcjQbw$> > . > > attaching kafka 3.0 unit test report. > > >- Gradle 7.3.3 >- >- java version "11.0.13" 2021-10-19 LTS >- Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370) >- Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, >mixed mode) >- >- Scala code runner version 2.13.0 > > > Thanks > SH@D@B > -- > *From:* Luke Chen > *Sent:* 24 December 2021 06:27 PM > *To:* Kafka Users > *Subject:* [External] : Re: Unit Test falling for Kafka 2.8.0 > > Hi Mohammad > > This is a known issue because of JFrog sunset the Bintray repo. > We've fixed this issue in v3.0 (PR: > > https://urldefense.com/v3/__https://github.com/apache/kafka/pull/10056__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5uJopPnZQ$ > ). > You should build/run tests successfully in v3.0. > If you can't change to v3.0, then you can refer to the PR above to fix it. > But we're not sure if it works well in v2.8 (because of different > dependencies) > > Thank you. > Luke > > On Fri, Dec 24, 2021 at 5:44 PM mohammad shadab < > mohammad.s.sha...@oracle.com> wrote: > > > Hi all, > > > > Need some assistance as unit test is falling for kafka 2.8.0 , below are > > detail. Appreciate any guidance , Please redirect me to correct audience > if > > i am firing this email to wrong audience. > > > > > > I have downloaded kafka 2.8.0 source code (from > > > https://urldefense.com/v3/__https://kafka.apache.org/downloads__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5vwPLA-0A$ > ) and build binaries (following > > > https://urldefense.com/v3/__https://github.com/apache/kafka/tree/2.8__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5vISx0xoQ$ > ) . Although build is successful > > after changing gradle plugin url in gradle/buildscript.gradle but unit > > test is falling. I have attached unit test result. > > > > > > My env details: > > > > > >- Gradle 6.8.3 > >- > >- java version "11.0.13" 2021-10-19 LTS > >- Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370) > >- Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, > >mixed mode) > >- > >- Scala code runner version 2.13.0 > > > > > > > > Few important points : > > > >1. Build and unit+integration test was working fine , around a month > >back. > >2. Since *url > >' > https://urldefense.com/v3/__https://dl.bintray.com/content/netflixoss/external-gradle-plugins/__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5uiVW2qzg$ > >< > https://urldefense.com/v3/__https://dl.bintray.com/content/netflixoss/external-gradle-plugins/__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5uiVW2qzg$ > >' * is > >down i have changed to *url ' > https://urldefense.com/v3/__https://plugins.gradle.org/m2/__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5vJiEzkKg$ > >< > https://urldefense.com/v3/__https://plugins.gradle.org/m2/__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5vJiEzkKg$ > >'* in *gradle/buildscript.gradle* file > >to make gradle command run. > >3. > > > > > > 1. > > > > Steps followed: > > > > ·gradle > > > > ·gradle jar > > > > ·gradle srcJar > > > > ·gradle aggregatedJavadoc > > > > ·gradle javadoc > > > > ·gradle javadocJar > > > > ·gradle scaladoc > > > > ·gradle scaladocJar > > > > ·gradle releaseTarGz > > > > ·gradle unitTest *(Runs but few fails, attached report)* > > > > ·gradle integrationTest *( throw error) * > > > > > > > > > > NOTE: Tried on kafka 3.0 , still see unit test is falling, with gradle > > 7.3 > > > > > > > > > > > > > > > > Thanks > > SH@D@B > > >
Re: [External] : Re: Unit Test falling for Kafka 2.8.0
Thanks a lot Luke for this piece of information. Yesterday i downloaded kafka 3.0 and its even falling there. Do i need to take it from git as i tool from https://kafka.apache.org/downloads<https://urldefense.com/v3/__https://kafka.apache.org/downloads__;!!ACWV5N9M2RV99hQ!e8QWe1ktCCKG2r0b2uBU9ydXmVOXfq-7ERFXY0M6KKBWBvdg5Az6ZPZjUZ47_TgqfcjQbw$> . attaching kafka 3.0 unit test report. * Gradle 7.3.3 * * java version "11.0.13" 2021-10-19 LTS * Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370) * Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, mixed mode) * * Scala code runner version 2.13.0 Thanks SH@D@B From: Luke Chen Sent: 24 December 2021 06:27 PM To: Kafka Users Subject: [External] : Re: Unit Test falling for Kafka 2.8.0 Hi Mohammad This is a known issue because of JFrog sunset the Bintray repo. We've fixed this issue in v3.0 (PR: https://urldefense.com/v3/__https://github.com/apache/kafka/pull/10056__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5uJopPnZQ$ ). You should build/run tests successfully in v3.0. If you can't change to v3.0, then you can refer to the PR above to fix it. But we're not sure if it works well in v2.8 (because of different dependencies) Thank you. Luke On Fri, Dec 24, 2021 at 5:44 PM mohammad shadab < mohammad.s.sha...@oracle.com> wrote: > Hi all, > > Need some assistance as unit test is falling for kafka 2.8.0 , below are > detail. Appreciate any guidance , Please redirect me to correct audience if > i am firing this email to wrong audience. > > > I have downloaded kafka 2.8.0 source code (from > https://urldefense.com/v3/__https://kafka.apache.org/downloads__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5vwPLA-0A$ > ) and build binaries (following > https://urldefense.com/v3/__https://github.com/apache/kafka/tree/2.8__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5vISx0xoQ$ > ) . Although build is successful > after changing gradle plugin url in gradle/buildscript.gradle but unit > test is falling. I have attached unit test result. > > > My env details: > > >- Gradle 6.8.3 >- >- java version "11.0.13" 2021-10-19 LTS >- Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370) >- Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, >mixed mode) >- >- Scala code runner version 2.13.0 > > > > Few important points : > >1. Build and unit+integration test was working fine , around a month >back. >2. Since *url > > 'https://urldefense.com/v3/__https://dl.bintray.com/content/netflixoss/external-gradle-plugins/__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5uiVW2qzg$ > > <https://urldefense.com/v3/__https://dl.bintray.com/content/netflixoss/external-gradle-plugins/__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5uiVW2qzg$ > >' * is >down i have changed to *url > 'https://urldefense.com/v3/__https://plugins.gradle.org/m2/__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5vJiEzkKg$ > > <https://urldefense.com/v3/__https://plugins.gradle.org/m2/__;!!ACWV5N9M2RV99hQ!Yvj5tMVbXKP49H5KxQPInd_i-cbrkzk2rPHFk2HZgVUVkkppuj5jfCAiT3hek5vJiEzkKg$ > >'* in *gradle/buildscript.gradle* file >to make gradle command run. >3. > > >1. > > Steps followed: > > ·gradle > > ·gradle jar > > ·gradle srcJar > > ·gradle aggregatedJavadoc > > ·gradle javadoc > > ·gradle javadocJar > > ·gradle scaladoc > > ·gradle scaladocJar > > ·gradle releaseTarGz > > ·gradle unitTest *(Runs but few fails, attached report)* > > ·gradle integrationTest *( throw error) * > > > > > NOTE: Tried on kafka 3.0 , still see unit test is falling, with gradle > 7.3 > > > > > > > > Thanks > SH@D@B >
Re: Unit Test falling for Kafka 2.8.0
Hi Mohammad This is a known issue because of JFrog sunset the Bintray repo. We've fixed this issue in v3.0 (PR: https://github.com/apache/kafka/pull/10056). You should build/run tests successfully in v3.0. If you can't change to v3.0, then you can refer to the PR above to fix it. But we're not sure if it works well in v2.8 (because of different dependencies) Thank you. Luke On Fri, Dec 24, 2021 at 5:44 PM mohammad shadab < mohammad.s.sha...@oracle.com> wrote: > Hi all, > > Need some assistance as unit test is falling for kafka 2.8.0 , below are > detail. Appreciate any guidance , Please redirect me to correct audience if > i am firing this email to wrong audience. > > > I have downloaded kafka 2.8.0 source code (from > https://kafka.apache.org/downloads ) and build binaries (following > https://github.com/apache/kafka/tree/2.8) . Although build is successful > after changing gradle plugin url in gradle/buildscript.gradle but unit > test is falling. I have attached unit test result. > > > My env details: > > >- Gradle 6.8.3 >- >- java version "11.0.13" 2021-10-19 LTS >- Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370) >- Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, >mixed mode) >- >- Scala code runner version 2.13.0 > > > > Few important points : > >1. Build and unit+integration test was working fine , around a month >back. >2. Since *url >'https://dl.bintray.com/content/netflixoss/external-gradle-plugins/ ><https://dl.bintray.com/content/netflixoss/external-gradle-plugins/>' * is >down i have changed to *url 'https://plugins.gradle.org/m2/ ><https://plugins.gradle.org/m2/>'* in *gradle/buildscript.gradle* file >to make gradle command run. >3. > > >1. > > Steps followed: > > ·gradle > > ·gradle jar > > ·gradle srcJar > > ·gradle aggregatedJavadoc > > ·gradle javadoc > > ·gradle javadocJar > > ·gradle scaladoc > > ·gradle scaladocJar > > ·gradle releaseTarGz > > ·gradle unitTest *(Runs but few fails, attached report)* > > ·gradle integrationTest *( throw error) * > > > > > NOTE: Tried on kafka 3.0 , still see unit test is falling, with gradle > 7.3 > > > > > > > > Thanks > SH@D@B >
Unit Test falling for Kafka 2.8.0
Hi all, Need some assistance as unit test is falling for kafka 2.8.0 , below are detail. Appreciate any guidance , Please redirect me to correct audience if i am firing this email to wrong audience. I have downloaded kafka 2.8.0 source code (from https://kafka.apache.org/downloads ) and build binaries (following https://github.com/apache/kafka/tree/2.8) . Although build is successful after changing gradle plugin url in gradle/buildscript.gradle but unit test is falling. I have attached unit test result. My env details: * Gradle 6.8.3 * * java version "11.0.13" 2021-10-19 LTS * Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370) * Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, mixed mode) * * Scala code runner version 2.13.0 Few important points : 1.Build and unit+integration test was working fine , around a month back. 2. Since url 'https://dl.bintray.com/content/netflixoss/external-gradle-plugins/' is down i have changed to url 'https://plugins.gradle.org/m2/' in gradle/buildscript.gradle file to make gradle command run. 3. 1. Steps followed: *gradle *gradle jar *gradle srcJar *gradle aggregatedJavadoc *gradle javadoc *gradle javadocJar *gradle scaladoc *gradle scaladocJar *gradle releaseTarGz *gradle unitTest (Runs but few fails, attached report) *gradle integrationTest ( throw error) NOTE: Tried on kafka 3.0 , still see unit test is falling, with gradle 7.3 Thanks SH@D@B
Re: Kafka 2.7.0 processor API and streams-test-utils changes
Hi John, Guozhang, No problem, I’ll keep an eye out for the change! Thanks for letting me know, I will give my change a try with your example. Appreciate the quick reply! Best, Upesh Upesh Desai | Senior Software Developer | ude...@itrsgroup.com www.itrsgroup.com From: John Roesler Date: Saturday, February 6, 2021 at 8:22 PM To: users@kafka.apache.org Subject: Re: Kafka 2.7.0 processor API and streams-test-utils changes Hello Upesh, I’m sorry for the trouble. This was my feature, and my oversight. I will update the docs on Monday. The quick answer is that there is also a new api.MockProcessorContext, which is compatible with the new interface. That class also provides getStateStoreContext to use with the state stores: https://kafka.apache.org/27/javadoc//org/apache/kafka/streams/processor/api/MockProcessorContext.html#getStateStoreContext--<https://kafka.apache.org/27/javadoc/org/apache/kafka/streams/processor/api/MockProcessorContext.html#getStateStoreContext--> Here is an example of how to use it: https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java#L57-L58 I hope this helps, and I’ll fix the docs ASAP. Thanks, John On Fri, Feb 5, 2021, at 17:33, Guozhang Wang wrote: > Hello Upesh, > > Thanks for the report! I think this is overlooked to update the > documentation with the new 2.7.0 release. Could you file a JIRA (or even > better, provide a PR with the JIRA :) to update the docs? > > > Guozhang > > On Thu, Feb 4, 2021 at 1:03 PM Upesh Desai wrote: > > > Hello, > > > > > > > > I recently upgraded our Kafka components to 2.7.0, and noticed the changes > > to the processor API. Specifically, the additions of: > > > > > > > >- org.apache.kafka.streams.processor.api.Processor > >- org.apache.kafka.streams.processor.api.ProcessorContext > > > > > > > > The old Topology.addProcessor() method has been deprecated, which is what > > led me to finding the new classes. After porting our code to the updated > > processor API, we noticed issues with the Processor unit tests, which had > > been written follow this documentation exactly: > > > > > > > > > > https://kafka.apache.org/27/documentation/streams/developer-guide/testing.html#unit-testing-processors > > > > > > > > However, it seems that the MockProcessorContext and possibly other test > > suite classes have not been updated for the new API changes, such as the > > following methods: > > > > > > > > store.init(context, store); > > > > context.register(store, null); > > > > > > > > Can someone point me in the right direction if this has indeed been > > changed/fixed or need to raise an issue to have this updated in the next > > release? > > > > > > > > Cheers! > > <https://www.itrsgroup.com/> > > Upesh Desai > > Senior Software Developer > > *ude...@itrsgroup.com* > > *www.itrsgroup.com* <https://www.itrsgroup.com/> > > Internet communications are not secure and therefore the ITRS Group does > > not accept legal responsibility for the contents of this message. Any view > > or opinions presented are solely those of the author and do not necessarily > > represent those of the ITRS Group unless otherwise specifically stated. > > [itrs.email.signature] > > > > > > *Disclaimer* > > > > The information contained in this communication from the sender is > > confidential. It is intended solely for use by the recipient and others > > authorized to receive it. If you are not the recipient, you are hereby > > notified that any disclosure, copying, distribution or taking action in > > relation of the contents of this information is strictly prohibited and may > > be unlawful. > > > > This email has been scanned for viruses and malware, and may have been > > automatically archived by *Mimecast Ltd*, an innovator in Software as a > > Service (SaaS) for business. Providing a *safer* and *more useful* place > > for your human generated data. Specializing in; Security, archiving and > > compliance. > > > > > -- > -- Guozhang > Disclaimer The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful. This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.
Re: Kafka 2.7.0 processor API and streams-test-utils changes
Hello Upesh, I’m sorry for the trouble. This was my feature, and my oversight. I will update the docs on Monday. The quick answer is that there is also a new api.MockProcessorContext, which is compatible with the new interface. That class also provides getStateStoreContext to use with the state stores: https://kafka.apache.org/27/javadoc//org/apache/kafka/streams/processor/api/MockProcessorContext.html#getStateStoreContext-- Here is an example of how to use it: https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java#L57-L58 I hope this helps, and I’ll fix the docs ASAP. Thanks, John On Fri, Feb 5, 2021, at 17:33, Guozhang Wang wrote: > Hello Upesh, > > Thanks for the report! I think this is overlooked to update the > documentation with the new 2.7.0 release. Could you file a JIRA (or even > better, provide a PR with the JIRA :) to update the docs? > > > Guozhang > > On Thu, Feb 4, 2021 at 1:03 PM Upesh Desai wrote: > > > Hello, > > > > > > > > I recently upgraded our Kafka components to 2.7.0, and noticed the changes > > to the processor API. Specifically, the additions of: > > > > > > > >- org.apache.kafka.streams.processor.api.Processor > >- org.apache.kafka.streams.processor.api.ProcessorContext > > > > > > > > The old Topology.addProcessor() method has been deprecated, which is what > > led me to finding the new classes. After porting our code to the updated > > processor API, we noticed issues with the Processor unit tests, which had > > been written follow this documentation exactly: > > > > > > > > > > https://kafka.apache.org/27/documentation/streams/developer-guide/testing.html#unit-testing-processors > > > > > > > > However, it seems that the MockProcessorContext and possibly other test > > suite classes have not been updated for the new API changes, such as the > > following methods: > > > > > > > > store.init(context, store); > > > > context.register(store, null); > > > > > > > > Can someone point me in the right direction if this has indeed been > > changed/fixed or need to raise an issue to have this updated in the next > > release? > > > > > > > > Cheers! > > <https://www.itrsgroup.com/> > > Upesh Desai > > Senior Software Developer > > *ude...@itrsgroup.com* > > *www.itrsgroup.com* <https://www.itrsgroup.com/> > > Internet communications are not secure and therefore the ITRS Group does > > not accept legal responsibility for the contents of this message. Any view > > or opinions presented are solely those of the author and do not necessarily > > represent those of the ITRS Group unless otherwise specifically stated. > > [itrs.email.signature] > > > > > > *Disclaimer* > > > > The information contained in this communication from the sender is > > confidential. It is intended solely for use by the recipient and others > > authorized to receive it. If you are not the recipient, you are hereby > > notified that any disclosure, copying, distribution or taking action in > > relation of the contents of this information is strictly prohibited and may > > be unlawful. > > > > This email has been scanned for viruses and malware, and may have been > > automatically archived by *Mimecast Ltd*, an innovator in Software as a > > Service (SaaS) for business. Providing a *safer* and *more useful* place > > for your human generated data. Specializing in; Security, archiving and > > compliance. > > > > > -- > -- Guozhang >
Re: Kafka 2.7.0 processor API and streams-test-utils changes
Hello Upesh, Thanks for the report! I think this is overlooked to update the documentation with the new 2.7.0 release. Could you file a JIRA (or even better, provide a PR with the JIRA :) to update the docs? Guozhang On Thu, Feb 4, 2021 at 1:03 PM Upesh Desai wrote: > Hello, > > > > I recently upgraded our Kafka components to 2.7.0, and noticed the changes > to the processor API. Specifically, the additions of: > > > >- org.apache.kafka.streams.processor.api.Processor >- org.apache.kafka.streams.processor.api.ProcessorContext > > > > The old Topology.addProcessor() method has been deprecated, which is what > led me to finding the new classes. After porting our code to the updated > processor API, we noticed issues with the Processor unit tests, which had > been written follow this documentation exactly: > > > > > https://kafka.apache.org/27/documentation/streams/developer-guide/testing.html#unit-testing-processors > > > > However, it seems that the MockProcessorContext and possibly other test > suite classes have not been updated for the new API changes, such as the > following methods: > > > > store.init(context, store); > > context.register(store, null); > > > > Can someone point me in the right direction if this has indeed been > changed/fixed or need to raise an issue to have this updated in the next > release? > > > > Cheers! > <https://www.itrsgroup.com/> > Upesh Desai > Senior Software Developer > *ude...@itrsgroup.com* > *www.itrsgroup.com* <https://www.itrsgroup.com/> > Internet communications are not secure and therefore the ITRS Group does > not accept legal responsibility for the contents of this message. Any view > or opinions presented are solely those of the author and do not necessarily > represent those of the ITRS Group unless otherwise specifically stated. > [itrs.email.signature] > > > *Disclaimer* > > The information contained in this communication from the sender is > confidential. It is intended solely for use by the recipient and others > authorized to receive it. If you are not the recipient, you are hereby > notified that any disclosure, copying, distribution or taking action in > relation of the contents of this information is strictly prohibited and may > be unlawful. > > This email has been scanned for viruses and malware, and may have been > automatically archived by *Mimecast Ltd*, an innovator in Software as a > Service (SaaS) for business. Providing a *safer* and *more useful* place > for your human generated data. Specializing in; Security, archiving and > compliance. > -- -- Guozhang
Kafka 2.7.0 processor API and streams-test-utils changes
Hello, I recently upgraded our Kafka components to 2.7.0, and noticed the changes to the processor API. Specifically, the additions of: * org.apache.kafka.streams.processor.api.Processor * org.apache.kafka.streams.processor.api.ProcessorContext The old Topology.addProcessor() method has been deprecated, which is what led me to finding the new classes. After porting our code to the updated processor API, we noticed issues with the Processor unit tests, which had been written follow this documentation exactly: https://kafka.apache.org/27/documentation/streams/developer-guide/testing.html#unit-testing-processors However, it seems that the MockProcessorContext and possibly other test suite classes have not been updated for the new API changes, such as the following methods: store.init(context, store); context.register(store, null); Can someone point me in the right direction if this has indeed been changed/fixed or need to raise an issue to have this updated in the next release? Cheers! Upesh Desai Senior Software Developer ude...@itrsgroup.com www.itrsgroup.com Internet communications are not secure and therefore the ITRS Group does not accept legal responsibility for the contents of this message. Any view or opinions presented are solely those of the author and do not necessarily represent those of the ITRS Group unless otherwise specifically stated. [itrs.email.signature] Disclaimer The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful. This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.
i how do use java kafka-client can test acks 0 and 1 and all between difference???
i how do use java kafka-client can test acks 0 and 1 and all between difference???
Writing Integration test with Embedded Kafka
I am trying to write an integration test using Embedded Kafka but I keep getting NullPointerException. My test case is very simple. It has following steps: 1. Read a JSON file & write messages to an inputTopic. 2. Perform a 'readStream' operation. 3. Do a 'select' on the Stream. This throws a NullPointerException. What am I doing wrong? Code is given below: "My Test which runs with Embedded Kafka" should "Generate correct Result" in { implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig( kafkaPort = 9066, zooKeeperPort = 2066, Map("log.dir" -> "./src/test/resources/") ) withRunningKafka { createCustomTopic(inputTopic) val source = Source.fromFile("src/test/resources/test1.json") source.getLines.toList.filterNot(_.isEmpty).foreach( line => publishStringMessageToKafka(inputTopic, line) ) source.close() implicit val deserializer: StringDeserializer = new StringDeserializer createCustomTopic(outputTopic) import spark2.implicits._ val schema = spark.read.json("my.json").schema val myStream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9066") .option("subscribe", inputTopic) .load() // Schema looks good myStream.printSchema() // Following line throws NULLPointerException! Why? val df = myStream.select(from_json($"value".cast("string"), schema).alias("value")) // There's more code... but let's not worry about that for now. } }
RE: Producer-Perf-test
Hi Himanshu, Pasting screenshot for better view. My Observations: * When I increased partitions, and keep replica same, then Throughput increases. Compare Row 1 and 2 * Increase number of replica and partitions same: Throughput decreased than before. Row 2 and 3 * Increase number of partitions and keep replica same as before: TP increased. Row 3 and 4 * Increase number of Replica and keep partitions same: TP decreased. Row 4 and 5. Question: Does it make any impact on throughput when number of partitions and replicas are equal? [cid:image001.png@01D5F202.89FE2BD0] From: Sunil CHAUDHARI Sent: Tuesday, March 3, 2020 2:43 PM To: users@kafka.apache.org Subject: Producer-Perf-test Hi, I have done performance testing of Kafka cluster using kafka-producer-perf-test.sh I created diff type of topics and did perf testing. Example: MB1P1R= MB is my topic name with 1 Partition and 1 replica. I have 3 nodes cluster. My Observations: * When I increased partitions, and keep replica same, then Throughput increases. Compare Row 1 and 2 * Increase number of replica and partitions same: Throughput decreased than before. Row 2 and 3 * Increase number of partitions and keep replica same as before: TP increased. Row 3 and 4 * Increase number of Replica and keep partitions same: TP decreased. Row 4 and 5. Question: Does it make any impact on throughput when number of partitions and replicas are equal? Sr NO Topic Partitions Replicas Network.threads IO-Threads Records Sent Batch-size-avg Throughput Latency (ms) Records/sec Size/sec Avg Max 1 MB1P1R 1 1 3 8 65495 16220 68653.03983 13.78 MB/sec 163.67 245 2 MB2P1R 2 1 3 8 65495 15865.719 58269.57295 11.70 MB/sec 173.81 470 3 MB2P2R 2 2 3 8 65495 16202.813 46286.21908 9.29 MB/sec 417.33 704 4 MB3P2R 3 2 3 8 65495 16184 56412.57537 11.32 MB/sec 229.82 550 5 MB3P3R 3 3 3 8 65495 46417.43444 9.32 MB/sec 411.44 705 Regards, Sunil. CONFIDENTIAL NOTE: The information contained in this email is intended only for the use of the individual or entity named above and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this message in error, please immediately notify the sender and delete the mail. Thank you.
Re: Producer-Perf-test
It's okay. You could probably take the screenshot and share it. On Tue, 3 Mar 2020, 20:13 sunil chaudhari, wrote: > Hi Himanshu, > Sorry but I pasted from excel. Dont know how it got messed up? > > Will resend it. > > On Tue, 3 Mar 2020 at 6:48 PM, Himanshu Shukla < > himanshushukla...@gmail.com> > wrote: > > > could you please share the result in some proper way? Each field is line > by > > line. > > > > On Tue, Mar 3, 2020 at 2:43 PM Sunil CHAUDHARI > > wrote: > > > > > Hi, > > > I have done performance testing of Kafka cluster using > > > kafka-producer-perf-test.sh > > > I created diff type of topics and did perf testing. Example: MB1P1R= MB > > is > > > my topic name with 1 Partition and 1 replica. > > > I have 3 nodes cluster. > > > > > > My Observations: > > > > > > * When I increased partitions, and keep replica same, then > Throughput > > > increases. Compare Row 1 and 2 > > > * Increase number of replica and partitions same: Throughput > > > decreased than before. Row 2 and 3 > > > * Increase number of partitions and keep replica same as before: TP > > > increased. Row 3 and 4 > > > * Increase number of Replica and keep partitions same: TP > decreased. > > > Row 4 and 5. > > > > > > Question: Does it make any impact on throughput when number of > partitions > > > and replicas are equal? > > > > > > Sr NO > > > > > > Topic > > > > > > Partitions > > > > > > Replicas > > > > > > Network.threads > > > > > > IO-Threads > > > > > > Records Sent > > > > > > Batch-size-avg > > > > > > Throughput > > > > > > Latency (ms) > > > > > > Records/sec > > > > > > Size/sec > > > > > > Avg > > > > > > Max > > > > > > 1 > > > > > > MB1P1R > > > > > > 1 > > > > > > 1 > > > > > > 3 > > > > > > 8 > > > > > > 65495 > > > > > > 16220 > > > > > > 68653.03983 > > > > > > 13.78 MB/sec > > > > > > 163.67 > > > > > > 245 > > > > > > 2 > > > > > > MB2P1R > > > > > > 2 > > > > > > 1 > > > > > > 3 > > > > > > 8 > > > > > > 65495 > > > > > > 15865.719 > > > > > > 58269.57295 > > > > > > 11.70 MB/sec > > > > > > 173.81 > > > > > > 470 > > > > > > 3 > > > > > > MB2P2R > > > > > > 2 > > > > > > 2 > > > > > > 3 > > > > > > 8 > > > > > > 65495 > > > > > > 16202.813 > > > > > > 46286.21908 > > > > > > 9.29 MB/sec > > > > > > 417.33 > > > > > > 704 > > > > > > 4 > > > > > > MB3P2R > > > > > > 3 > > > > > > 2 > > > > > > 3 > > > > > > 8 > > > > > > 65495 > > > > > > 16184 > > > > > > 56412.57537 > > > > > > 11.32 MB/sec > > > > > > 229.82 > > > > > > 550 > > > > > > 5 > > > > > > MB3P3R > > > > > > 3 > > > > > > 3 > > > > > > 3 > > > > > > 8 > > > > > > 65495 > > > > > > > > > > > > 46417.43444 > > > > > > 9.32 MB/sec > > > > > > 411.44 > > > > > > 705 > > > > > > > > > > > > Regards, > > > Sunil. > > > CONFIDENTIAL NOTE: > > > The information contained in this email is intended only for the use of > > > the individual or entity named above and may contain information that > is > > > privileged, confidential and exempt from disclosure under applicable > law. > > > If the reader of this message is not the intended recipient, you are > > hereby > > > notified that any dissemination, distribution or copying of this > > > communication is strictly prohibited. If you have received this message > > in > > > error, please immediately notify the sender and delete the mail. Thank > > you. > > > > > > > > > -- > > Regards, > > Himanshu Shukla > > >
Re: Producer-Perf-test
Hi Himanshu, Sorry but I pasted from excel. Dont know how it got messed up? Will resend it. On Tue, 3 Mar 2020 at 6:48 PM, Himanshu Shukla wrote: > could you please share the result in some proper way? Each field is line by > line. > > On Tue, Mar 3, 2020 at 2:43 PM Sunil CHAUDHARI > wrote: > > > Hi, > > I have done performance testing of Kafka cluster using > > kafka-producer-perf-test.sh > > I created diff type of topics and did perf testing. Example: MB1P1R= MB > is > > my topic name with 1 Partition and 1 replica. > > I have 3 nodes cluster. > > > > My Observations: > > > > * When I increased partitions, and keep replica same, then Throughput > > increases. Compare Row 1 and 2 > > * Increase number of replica and partitions same: Throughput > > decreased than before. Row 2 and 3 > > * Increase number of partitions and keep replica same as before: TP > > increased. Row 3 and 4 > > * Increase number of Replica and keep partitions same: TP decreased. > > Row 4 and 5. > > > > Question: Does it make any impact on throughput when number of partitions > > and replicas are equal? > > > > Sr NO > > > > Topic > > > > Partitions > > > > Replicas > > > > Network.threads > > > > IO-Threads > > > > Records Sent > > > > Batch-size-avg > > > > Throughput > > > > Latency (ms) > > > > Records/sec > > > > Size/sec > > > > Avg > > > > Max > > > > 1 > > > > MB1P1R > > > > 1 > > > > 1 > > > > 3 > > > > 8 > > > > 65495 > > > > 16220 > > > > 68653.03983 > > > > 13.78 MB/sec > > > > 163.67 > > > > 245 > > > > 2 > > > > MB2P1R > > > > 2 > > > > 1 > > > > 3 > > > > 8 > > > > 65495 > > > > 15865.719 > > > > 58269.57295 > > > > 11.70 MB/sec > > > > 173.81 > > > > 470 > > > > 3 > > > > MB2P2R > > > > 2 > > > > 2 > > > > 3 > > > > 8 > > > > 65495 > > > > 16202.813 > > > > 46286.21908 > > > > 9.29 MB/sec > > > > 417.33 > > > > 704 > > > > 4 > > > > MB3P2R > > > > 3 > > > > 2 > > > > 3 > > > > 8 > > > > 65495 > > > > 16184 > > > > 56412.57537 > > > > 11.32 MB/sec > > > > 229.82 > > > > 550 > > > > 5 > > > > MB3P3R > > > > 3 > > > > 3 > > > > 3 > > > > 8 > > > > 65495 > > > > > > > > 46417.43444 > > > > 9.32 MB/sec > > > > 411.44 > > > > 705 > > > > > > > > Regards, > > Sunil. > > CONFIDENTIAL NOTE: > > The information contained in this email is intended only for the use of > > the individual or entity named above and may contain information that is > > privileged, confidential and exempt from disclosure under applicable law. > > If the reader of this message is not the intended recipient, you are > hereby > > notified that any dissemination, distribution or copying of this > > communication is strictly prohibited. If you have received this message > in > > error, please immediately notify the sender and delete the mail. Thank > you. > > > > > -- > Regards, > Himanshu Shukla >
Re: Producer-Perf-test
could you please share the result in some proper way? Each field is line by line. On Tue, Mar 3, 2020 at 2:43 PM Sunil CHAUDHARI wrote: > Hi, > I have done performance testing of Kafka cluster using > kafka-producer-perf-test.sh > I created diff type of topics and did perf testing. Example: MB1P1R= MB is > my topic name with 1 Partition and 1 replica. > I have 3 nodes cluster. > > My Observations: > > * When I increased partitions, and keep replica same, then Throughput > increases. Compare Row 1 and 2 > * Increase number of replica and partitions same: Throughput > decreased than before. Row 2 and 3 > * Increase number of partitions and keep replica same as before: TP > increased. Row 3 and 4 > * Increase number of Replica and keep partitions same: TP decreased. > Row 4 and 5. > > Question: Does it make any impact on throughput when number of partitions > and replicas are equal? > > Sr NO > > Topic > > Partitions > > Replicas > > Network.threads > > IO-Threads > > Records Sent > > Batch-size-avg > > Throughput > > Latency (ms) > > Records/sec > > Size/sec > > Avg > > Max > > 1 > > MB1P1R > > 1 > > 1 > > 3 > > 8 > > 65495 > > 16220 > > 68653.03983 > > 13.78 MB/sec > > 163.67 > > 245 > > 2 > > MB2P1R > > 2 > > 1 > > 3 > > 8 > > 65495 > > 15865.719 > > 58269.57295 > > 11.70 MB/sec > > 173.81 > > 470 > > 3 > > MB2P2R > > 2 > > 2 > > 3 > > 8 > > 65495 > > 16202.813 > > 46286.21908 > > 9.29 MB/sec > > 417.33 > > 704 > > 4 > > MB3P2R > > 3 > > 2 > > 3 > > 8 > > 65495 > > 16184 > > 56412.57537 > > 11.32 MB/sec > > 229.82 > > 550 > > 5 > > MB3P3R > > 3 > > 3 > > 3 > > 8 > > 65495 > > > > 46417.43444 > > 9.32 MB/sec > > 411.44 > > 705 > > > > Regards, > Sunil. > CONFIDENTIAL NOTE: > The information contained in this email is intended only for the use of > the individual or entity named above and may contain information that is > privileged, confidential and exempt from disclosure under applicable law. > If the reader of this message is not the intended recipient, you are hereby > notified that any dissemination, distribution or copying of this > communication is strictly prohibited. If you have received this message in > error, please immediately notify the sender and delete the mail. Thank you. > -- Regards, Himanshu Shukla
Producer-Perf-test
Hi, I have done performance testing of Kafka cluster using kafka-producer-perf-test.sh I created diff type of topics and did perf testing. Example: MB1P1R= MB is my topic name with 1 Partition and 1 replica. I have 3 nodes cluster. My Observations: * When I increased partitions, and keep replica same, then Throughput increases. Compare Row 1 and 2 * Increase number of replica and partitions same: Throughput decreased than before. Row 2 and 3 * Increase number of partitions and keep replica same as before: TP increased. Row 3 and 4 * Increase number of Replica and keep partitions same: TP decreased. Row 4 and 5. Question: Does it make any impact on throughput when number of partitions and replicas are equal? Sr NO Topic Partitions Replicas Network.threads IO-Threads Records Sent Batch-size-avg Throughput Latency (ms) Records/sec Size/sec Avg Max 1 MB1P1R 1 1 3 8 65495 16220 68653.03983 13.78 MB/sec 163.67 245 2 MB2P1R 2 1 3 8 65495 15865.719 58269.57295 11.70 MB/sec 173.81 470 3 MB2P2R 2 2 3 8 65495 16202.813 46286.21908 9.29 MB/sec 417.33 704 4 MB3P2R 3 2 3 8 65495 16184 56412.57537 11.32 MB/sec 229.82 550 5 MB3P3R 3 3 3 8 65495 46417.43444 9.32 MB/sec 411.44 705 Regards, Sunil. CONFIDENTIAL NOTE: The information contained in this email is intended only for the use of the individual or entity named above and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this message in error, please immediately notify the sender and delete the mail. Thank you.
consumer-perf -test metrics
Hi, I have run consumer performance test on my kafka cluster. Can you please help me to understand below parameters? Basically I don't know whats unit of those parameters? I cant assume it blindly Its only given in 2 columnts, the "Metric Name" and its "Value" Metric Name ValueUnit I assume? consumer-coordinator-metrics:commit-latency-avg:{client-id=consumer-1} : 4.000 ms consumer-coordinator-metrics:commit-latency-max:{client-id=consumer-1} : 4.000 ms consumer-coordinator-metrics:commit-rate:{client-id=consumer-1} : 0.033 ? consumer-coordinator-metrics:commit-total:{client-id=consumer-1} : 1.000 ? consumer-fetch-manager-metrics:fetch-rate:{client-id=consumer-1} : 0.493 ? consumer-fetch-manager-metrics:fetch-size-avg:{client-id=consumer-1, topic=mbtest1}: 958250.400 ? consumer-fetch-manager-metrics:fetch-size-avg:{client-id=consumer-1} : 958250.400 ? consumer-fetch-manager-metrics:fetch-size-max:{client-id=consumer-1, topic=mbtest1}: 71.000 ? consumer-fetch-manager-metrics:fetch-size-max:{client-id=consumer-1} : 71.000 ? consumer-fetch-manager-metrics:fetch-total:{client-id=consumer-1} : 15.000 ? consumer-fetch-manager-metrics:records-per-request-avg:{client-id=consumer-1, topic=mbtest1} : 4366.333 consumer-fetch-manager-metrics:records-per-request-avg:{client-id=consumer-1} : 4366.333 consumer-fetch-manager-metrics:records-lead-avg:{client-id=consumer-1, topic=mbtest1, partition=0} : 32679.907 consumer-fetch-manager-metrics:records-lead-min:{client-id=consumer-1, topic=mbtest1, partition=0} : 500.000 consumer-fetch-manager-metrics:records-lead-min:{client-id=consumer-1} : 500.000 consumer-fetch-manager-metrics:records-lead:{client-id=consumer-1, topic=mbtest1, partition=0} : 65495.000 Whats mean by recods-lead? Can I consider below metrics as throughput of my consumer? consumer-fetch-manager-metrics:records-consumed-rate:{client-id=consumer-1, topic=mbtest1} : 2159.769 consumer-fetch-manager-metrics:records-consumed-rate:{client-id=consumer-1} : 2159.698 What is diff between in "records-per-request-avg" and "records-consumed-rate" ? Regards, Sunil. CONFIDENTIAL NOTE: The information contained in this email is intended only for the use of the individual or entity named above and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this message in error, please immediately notify the sender and delete the mail. Thank you.
Consumer-perf-test shows only headers, NO values
Hi, When I run Consumer-perf-test from kafka bin directory to measure performance, thne in the output file, it shows only headers. Ot doesnt prints their values. Any thought on this? When I run producer-perf-test then it prints detailed information like, Number of records read: Number of records sent Records per seconds, avg latency etc. But in case of consumer perf, its not showing the throughput. How many records read per seconds? Regards, Sunil Chaudhari
Re: kafka-consumer-perf-test error when printing metrics
I forgot to mention I'm using Kafka 2.4.0. Regards, *João Paulo Leonidas Fernandes Dias da Silva aka JP**Lead Consultant - Software Developer* emailjsi...@thoughtworks.com Telephone+1 513 628 7609 [image: ThoughtWorks] <http://www.thoughtworks.com/> <https://www.thoughtworks.com/perspectives> On Thu, Feb 20, 2020 at 3:39 PM Jp Silva wrote: > Hi, > > I'm using kafka-consumer-perf-test but I'm getting an error if I add the > --print-metrics option. > > Here's a snippet of my output including the error: > > consumer-fetch-manager-metrics:fetch-size-max:{client-id=consumer-perf-consumer-99250-1} > : 0.000 > consumer-fetch-manager-metrics:fetch-throttle-time-avg:{client-id=consumer-perf-consumer-99250-1} >: 0.000 > consumer-fetch-manager-metrics:fetch-throttle-time-max:{client-id=consumer-perf-consumer-99250-1} >: 0.000 > consumer-fetch-manager-metrics:fetch-total:{client-id=consumer-perf-consumer-99250-1} >: 72.000 > Exception in thread "main" java.util.IllegalFormatConversionException: f > != java.lang.Integer > at > java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4302) > at > java.util.Formatter$FormatSpecifier.printFloat(Formatter.java:2806) > at java.util.Formatter$FormatSpecifier.print(Formatter.java:2753) > at java.util.Formatter.format(Formatter.java:2520) > at java.util.Formatter.format(Formatter.java:2455) > at java.lang.String.format(String.java:2940) > at > scala.collection.immutable.StringLike.format(StringLike.scala:354) > at > scala.collection.immutable.StringLike.format$(StringLike.scala:353) > at scala.collection.immutable.StringOps.format(StringOps.scala:33) > at > kafka.utils.ToolsUtils$.$anonfun$printMetrics$3(ToolsUtils.scala:60) > at > kafka.utils.ToolsUtils$.$anonfun$printMetrics$3$adapted(ToolsUtils.scala:58) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at kafka.utils.ToolsUtils$.printMetrics(ToolsUtils.scala:58) > at > kafka.tools.ConsumerPerformance$.main(ConsumerPerformance.scala:82) > at kafka.tools.ConsumerPerformance.main(ConsumerPerformance.scala) > > Does anyone have the same issue? Am I doing something wrong? > > Regards, > > JP Silva > >
kafka-consumer-perf-test error when printing metrics
Hi, I'm using kafka-consumer-perf-test but I'm getting an error if I add the --print-metrics option. Here's a snippet of my output including the error: consumer-fetch-manager-metrics:fetch-size-max:{client-id=consumer-perf-consumer-99250-1} : 0.000 consumer-fetch-manager-metrics:fetch-throttle-time-avg:{client-id=consumer-perf-consumer-99250-1} : 0.000 consumer-fetch-manager-metrics:fetch-throttle-time-max:{client-id=consumer-perf-consumer-99250-1} : 0.000 consumer-fetch-manager-metrics:fetch-total:{client-id=consumer-perf-consumer-99250-1} : 72.000 Exception in thread "main" java.util.IllegalFormatConversionException: f != java.lang.Integer at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4302) at java.util.Formatter$FormatSpecifier.printFloat(Formatter.java:2806) at java.util.Formatter$FormatSpecifier.print(Formatter.java:2753) at java.util.Formatter.format(Formatter.java:2520) at java.util.Formatter.format(Formatter.java:2455) at java.lang.String.format(String.java:2940) at scala.collection.immutable.StringLike.format(StringLike.scala:354) at scala.collection.immutable.StringLike.format$(StringLike.scala:353) at scala.collection.immutable.StringOps.format(StringOps.scala:33) at kafka.utils.ToolsUtils$.$anonfun$printMetrics$3(ToolsUtils.scala:60) at kafka.utils.ToolsUtils$.$anonfun$printMetrics$3$adapted(ToolsUtils.scala:58) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at kafka.utils.ToolsUtils$.printMetrics(ToolsUtils.scala:58) at kafka.tools.ConsumerPerformance$.main(ConsumerPerformance.scala:82) at kafka.tools.ConsumerPerformance.main(ConsumerPerformance.scala) Does anyone have the same issue? Am I doing something wrong? Regards, JP Silva
Consistency of Replicated Kafka Cluster and ways to test
Hello, We are planing to use Mirror Maker 1 to replicate kafka cluster. We are trying to find a way how to verify the consistency of remote / destination cluster. Is there any way that we can compare the topics in certain period and consistency of the cluster. Can you propose certain way/ test for lost messages? kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group KafkaMirror --zkconnect source-cluster-zookeeper:2181 --topic test-topic I can understand that that is the only way we can check the mirroring? Best Regards, -- Kamen Tarlov
Re: Consistency of Replicated Kafka Cluster and ways to test
Hello, We are planing to use Mirror Maker 1 to replicate kafka cluster. We are trying to find a way how to verify the consistency of remote / destination cluster. Is there any way that we can compare the topics in certain period and consistency of the cluster. Can you propose certain way/ test for lost messages? kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group KafkaMirror --zkconnect source-cluster-zookeeper:2181 --topic test-topic I can understand that that is the only way we can check the mirroring? Best Regards, -- Kamen Tarlov
Re: Transaction error in Kafka producer perf test
I understand that we could have used batch size as 0 here, if I am interpreting the documentation correctly.We essentially want no batching, and setting any value less than 256 in this particular case would achieve that, since the message size is 256 bytes for any message produced. Sincerely, Anindya Haldar Oracle Responsys > On Oct 28, 2019, at 12:30 PM, Anindya Haldar > wrote: > > We are consciously choosing the batch size in the experiment to be 1 because > that is what our producer use case will need to use. > > Regarding the request timeout that you have mentioned, how does one set that? > The command line is already passing two timeout values: ‘max.block.ms’, and > ‘transaction-duration-ms’. Both are set to 1 hour, and the failure we see > happens within a few minutes. Is there a third timeout value that is expiring > and causing this issue? > > Sincerely, > Anindya Haldar > Oracle Responsys > > >> On Oct 28, 2019, at 11:52 AM, M. Manna wrote: >> >> Hi, >> >> Per test is based on a set of tuning parameters e.g. batch.size, axes, >> partitions, network latency etc. Your transactions are failing because your >> batch has expired, (or at least, that’s what it shows on the log). You have >> to tune your request timeout and batch.size correctly to improve on these. >> I suggest you try and get this right first without a non-txn producer >> setup. Then attempt with txn. >> >> Perhaps you want to recheck docs and understand what goal you want to >> target (e.g. speed, consistency, balanced etc.). >> >> Regards, >> >> >> On Mon, 28 Oct 2019 at 17:58, Anindya Haldar >> wrote: >> >>> Anyone with a pointer on this? Do transactions work reliably with Kafka >>> perf test tools? If yes, then is there a way to make it work in this >>> scenario? >>> >>> Sincerely, >>> Anindya Haldar >>> Oracle Responsys >>> >>> >>>> On Oct 25, 2019, at 2:51 PM, Anindya Haldar >>> wrote: >>>> >>>> We are evaluating Kafka for some of our use cases. As part of that >>> effort I am trying to run an experiment with a cluster we have set up, and >>> using the producer perf test tool supplied with the binaries. >>>> >>>> Here’s the cluster info: >>>> >>>> Runs in Kubernetes, with 4 CPUs, 32 GB RAM, 100 GB log space allocation >>> for each node. >>>> 3 ZooKeeper nodes >>>> 5 Kafka nodes >>>> >>>> Here is the topic description: >>>> >>>> $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 >>> --topic testtopic3 >>>> Topic:testtopic3PartitionCount:5ReplicationFactor:3 >>> Configs:min.insync.replicas=2,segment.bytes=1073741824,retention.ms < >>> https://urldefense.proofpoint.com/v2/url?u=https-3A__slack-2Dredir.net_link-3Furl-3Dhttp-253A-252F-252Fretention.ms&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI&m=bcvrB2WHGYyMkDxuOiAX_kDm8dM_Gnhn9co-80eQpeM&s=d9QdJSgIZN7kFfuuRJDX3gi3J8Y7uTnC7UrLSFBdasI&e= >>>> =360,flush.messages=1,unclean.leader.election.enable=false >>>> Topic: testtopic3Partition: 0Leader: 1Replicas: 1,4,2 >>> Isr: 1,4,2 >>>> Topic: testtopic3Partition: 1Leader: 4Replicas: 4,2,3 >>> Isr: 4,2,3 >>>> Topic: testtopic3Partition: 2Leader: 2Replicas: 2,3,5 >>> Isr: 2,3,5 >>>> Topic: testtopic3Partition: 3Leader: 3Replicas: 3,5,1 >>> Isr: 3,5,1 >>>> Topic: testtopic3Partition: 4Leader: 5Replicas: 5,1,4 >>> Isr: 5,1,4 >>>> >>>> >>>> >>>> And here is the producer test run command line and the result: >>>> >>>> $ bin/kafka-producer-perf-test.sh --topic testtopic3 --num-records >>> 100 --throughput -1 --record-size 256 --producer-props >>> bootstrap.servers=kafka-a-0.ri:9092,kafka-b-0.ri:9092,kafka-c-0.ri:9092,kafka-d-0.ri:9092,kafka-e-0.ri:9092 >>> acks=all batch.size=1 max.block.ms < >>> https://urldefense.proofpoint.com/v2/url?u=https-3A__slack-2Dredir.net_link-3Furl-3Dhttp-253A-252F-252Fmax.block.ms&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI&m=bcvrB2WHGYyMkDxuOiAX_kDm8dM_Gnhn9co-80eQpeM&s=XP3cGgt3rD1lr8r0diTLpP4FP3BwZ8spcHNDNXc3i6I&e= >>>> =360 enable.idempotence=true max.in.flight.requests.per.connection=1 >>> retries
Re: Transaction error in Kafka producer perf test
We are consciously choosing the batch size in the experiment to be 1 because that is what our producer use case will need to use. Regarding the request timeout that you have mentioned, how does one set that? The command line is already passing two timeout values: ‘max.block.ms’, and ‘transaction-duration-ms’. Both are set to 1 hour, and the failure we see happens within a few minutes. Is there a third timeout value that is expiring and causing this issue? Sincerely, Anindya Haldar Oracle Responsys > On Oct 28, 2019, at 11:52 AM, M. Manna wrote: > > Hi, > > Per test is based on a set of tuning parameters e.g. batch.size, axes, > partitions, network latency etc. Your transactions are failing because your > batch has expired, (or at least, that’s what it shows on the log). You have > to tune your request timeout and batch.size correctly to improve on these. > I suggest you try and get this right first without a non-txn producer > setup. Then attempt with txn. > > Perhaps you want to recheck docs and understand what goal you want to > target (e.g. speed, consistency, balanced etc.). > > Regards, > > > On Mon, 28 Oct 2019 at 17:58, Anindya Haldar > wrote: > >> Anyone with a pointer on this? Do transactions work reliably with Kafka >> perf test tools? If yes, then is there a way to make it work in this >> scenario? >> >> Sincerely, >> Anindya Haldar >> Oracle Responsys >> >> >>> On Oct 25, 2019, at 2:51 PM, Anindya Haldar >> wrote: >>> >>> We are evaluating Kafka for some of our use cases. As part of that >> effort I am trying to run an experiment with a cluster we have set up, and >> using the producer perf test tool supplied with the binaries. >>> >>> Here’s the cluster info: >>> >>> Runs in Kubernetes, with 4 CPUs, 32 GB RAM, 100 GB log space allocation >> for each node. >>> 3 ZooKeeper nodes >>> 5 Kafka nodes >>> >>> Here is the topic description: >>> >>> $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 >> --topic testtopic3 >>> Topic:testtopic3PartitionCount:5ReplicationFactor:3 >> Configs:min.insync.replicas=2,segment.bytes=1073741824,retention.ms < >> https://urldefense.proofpoint.com/v2/url?u=https-3A__slack-2Dredir.net_link-3Furl-3Dhttp-253A-252F-252Fretention.ms&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI&m=bcvrB2WHGYyMkDxuOiAX_kDm8dM_Gnhn9co-80eQpeM&s=d9QdJSgIZN7kFfuuRJDX3gi3J8Y7uTnC7UrLSFBdasI&e= >>> =360,flush.messages=1,unclean.leader.election.enable=false >>> Topic: testtopic3Partition: 0Leader: 1Replicas: 1,4,2 >> Isr: 1,4,2 >>> Topic: testtopic3Partition: 1Leader: 4Replicas: 4,2,3 >> Isr: 4,2,3 >>> Topic: testtopic3Partition: 2Leader: 2Replicas: 2,3,5 >> Isr: 2,3,5 >>> Topic: testtopic3Partition: 3Leader: 3Replicas: 3,5,1 >> Isr: 3,5,1 >>> Topic: testtopic3Partition: 4Leader: 5Replicas: 5,1,4 >> Isr: 5,1,4 >>> >>> >>> >>> And here is the producer test run command line and the result: >>> >>> $ bin/kafka-producer-perf-test.sh --topic testtopic3 --num-records >> 100 --throughput -1 --record-size 256 --producer-props >> bootstrap.servers=kafka-a-0.ri:9092,kafka-b-0.ri:9092,kafka-c-0.ri:9092,kafka-d-0.ri:9092,kafka-e-0.ri:9092 >> acks=all batch.size=1 max.block.ms < >> https://urldefense.proofpoint.com/v2/url?u=https-3A__slack-2Dredir.net_link-3Furl-3Dhttp-253A-252F-252Fmax.block.ms&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI&m=bcvrB2WHGYyMkDxuOiAX_kDm8dM_Gnhn9co-80eQpeM&s=XP3cGgt3rD1lr8r0diTLpP4FP3BwZ8spcHNDNXc3i6I&e= >>> =360 enable.idempotence=true max.in.flight.requests.per.connection=1 >> retries=3 --transaction-duration-ms 360 >>> 4100 records sent, 819.7 records/sec (0.20 MB/sec), 2572.0 ms avg >> latency, 4892.0 ms max latency. >>> 4261 records sent, 852.0 records/sec (0.21 MB/sec), 7397.2 ms avg >> latency, 9873.0 ms max latency. >>> 4216 records sent, 843.0 records/sec (0.21 MB/sec), 12383.7 ms avg >> latency, 14849.0 ms max latency. >>> 4400 records sent, 879.8 records/sec (0.21 MB/sec), 17332.0 ms avg >> latency, 19784.0 ms max latency. >>> 4354 records sent, 870.8 records/sec (0.21 MB/sec), 22349.4 ms avg >> latency, 24763.0 ms max latency. >>> 4477 records sent, 895.4 records/sec (0.22 MB/sec), 27241.1 ms avg >> latency, 29728.0 m
Re: Transaction error in Kafka producer perf test
Hi, Per test is based on a set of tuning parameters e.g. batch.size, axes, partitions, network latency etc. Your transactions are failing because your batch has expired, (or at least, that’s what it shows on the log). You have to tune your request timeout and batch.size correctly to improve on these. I suggest you try and get this right first without a non-txn producer setup. Then attempt with txn. Perhaps you want to recheck docs and understand what goal you want to target (e.g. speed, consistency, balanced etc.). Regards, On Mon, 28 Oct 2019 at 17:58, Anindya Haldar wrote: > Anyone with a pointer on this? Do transactions work reliably with Kafka > perf test tools? If yes, then is there a way to make it work in this > scenario? > > Sincerely, > Anindya Haldar > Oracle Responsys > > > > On Oct 25, 2019, at 2:51 PM, Anindya Haldar > wrote: > > > > We are evaluating Kafka for some of our use cases. As part of that > effort I am trying to run an experiment with a cluster we have set up, and > using the producer perf test tool supplied with the binaries. > > > > Here’s the cluster info: > > > > Runs in Kubernetes, with 4 CPUs, 32 GB RAM, 100 GB log space allocation > for each node. > > 3 ZooKeeper nodes > > 5 Kafka nodes > > > > Here is the topic description: > > > > $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 > --topic testtopic3 > > Topic:testtopic3PartitionCount:5ReplicationFactor:3 > Configs:min.insync.replicas=2,segment.bytes=1073741824,retention.ms < > https://urldefense.proofpoint.com/v2/url?u=https-3A__slack-2Dredir.net_link-3Furl-3Dhttp-253A-252F-252Fretention.ms&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI&m=bcvrB2WHGYyMkDxuOiAX_kDm8dM_Gnhn9co-80eQpeM&s=d9QdJSgIZN7kFfuuRJDX3gi3J8Y7uTnC7UrLSFBdasI&e= > >=360,flush.messages=1,unclean.leader.election.enable=false > >Topic: testtopic3Partition: 0Leader: 1Replicas: 1,4,2 > Isr: 1,4,2 > >Topic: testtopic3Partition: 1Leader: 4Replicas: 4,2,3 > Isr: 4,2,3 > >Topic: testtopic3Partition: 2Leader: 2Replicas: 2,3,5 > Isr: 2,3,5 > >Topic: testtopic3Partition: 3Leader: 3Replicas: 3,5,1 > Isr: 3,5,1 > >Topic: testtopic3Partition: 4Leader: 5Replicas: 5,1,4 > Isr: 5,1,4 > > > > > > > > And here is the producer test run command line and the result: > > > > $ bin/kafka-producer-perf-test.sh --topic testtopic3 --num-records > 100 --throughput -1 --record-size 256 --producer-props > bootstrap.servers=kafka-a-0.ri:9092,kafka-b-0.ri:9092,kafka-c-0.ri:9092,kafka-d-0.ri:9092,kafka-e-0.ri:9092 > acks=all batch.size=1 max.block.ms < > https://urldefense.proofpoint.com/v2/url?u=https-3A__slack-2Dredir.net_link-3Furl-3Dhttp-253A-252F-252Fmax.block.ms&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI&m=bcvrB2WHGYyMkDxuOiAX_kDm8dM_Gnhn9co-80eQpeM&s=XP3cGgt3rD1lr8r0diTLpP4FP3BwZ8spcHNDNXc3i6I&e= > >=360 enable.idempotence=true max.in.flight.requests.per.connection=1 > retries=3 --transaction-duration-ms 360 > > 4100 records sent, 819.7 records/sec (0.20 MB/sec), 2572.0 ms avg > latency, 4892.0 ms max latency. > > 4261 records sent, 852.0 records/sec (0.21 MB/sec), 7397.2 ms avg > latency, 9873.0 ms max latency. > > 4216 records sent, 843.0 records/sec (0.21 MB/sec), 12383.7 ms avg > latency, 14849.0 ms max latency. > > 4400 records sent, 879.8 records/sec (0.21 MB/sec), 17332.0 ms avg > latency, 19784.0 ms max latency. > > 4354 records sent, 870.8 records/sec (0.21 MB/sec), 22349.4 ms avg > latency, 24763.0 ms max latency. > > 4477 records sent, 895.4 records/sec (0.22 MB/sec), 27241.1 ms avg > latency, 29728.0 ms max latency. > > 4366 records sent, 873.2 records/sec (0.21 MB/sec), 32218.3 ms avg > latency, 34703.0 ms max latency. > > 4408 records sent, 881.6 records/sec (0.22 MB/sec), 37190.6 ms avg > latency, 39672.0 ms max latency. > > 4159 records sent, 831.5 records/sec (0.20 MB/sec), 42135.0 ms avg > latency, 44640.0 ms max latency. > > 4260 records sent, 852.0 records/sec (0.21 MB/sec), 47098.0 ms avg > latency, 49624.0 ms max latency. > > 4360 records sent, 872.0 records/sec (0.21 MB/sec), 52137.1 ms avg > latency, 54574.0 ms max latency. > > 4514 records sent, 902.8 records/sec (0.22 MB/sec), 57038.1 ms avg > latency, 59554.0 ms max latency. > > 4273 records sent, 854.3 records/sec (0.21 MB/sec), 62001.8 ms avg > latency, 64524.0 ms max latency. > > 4348 records sent, 869.6 records/sec (0.21 MB/sec), 67037.8 ms avg > l
Re: Transaction error in Kafka producer perf test
Anyone with a pointer on this? Do transactions work reliably with Kafka perf test tools? If yes, then is there a way to make it work in this scenario? Sincerely, Anindya Haldar Oracle Responsys > On Oct 25, 2019, at 2:51 PM, Anindya Haldar wrote: > > We are evaluating Kafka for some of our use cases. As part of that effort I > am trying to run an experiment with a cluster we have set up, and using the > producer perf test tool supplied with the binaries. > > Here’s the cluster info: > > Runs in Kubernetes, with 4 CPUs, 32 GB RAM, 100 GB log space allocation for > each node. > 3 ZooKeeper nodes > 5 Kafka nodes > > Here is the topic description: > > $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic > testtopic3 > Topic:testtopic3PartitionCount:5ReplicationFactor:3 > Configs:min.insync.replicas=2,segment.bytes=1073741824,retention.ms > <https://urldefense.proofpoint.com/v2/url?u=https-3A__slack-2Dredir.net_link-3Furl-3Dhttp-253A-252F-252Fretention.ms&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI&m=bcvrB2WHGYyMkDxuOiAX_kDm8dM_Gnhn9co-80eQpeM&s=d9QdJSgIZN7kFfuuRJDX3gi3J8Y7uTnC7UrLSFBdasI&e= > >=360,flush.messages=1,unclean.leader.election.enable=false >Topic: testtopic3Partition: 0Leader: 1Replicas: 1,4,2Isr: > 1,4,2 >Topic: testtopic3Partition: 1Leader: 4Replicas: 4,2,3Isr: > 4,2,3 >Topic: testtopic3Partition: 2Leader: 2Replicas: 2,3,5Isr: > 2,3,5 >Topic: testtopic3Partition: 3Leader: 3Replicas: 3,5,1Isr: > 3,5,1 > Topic: testtopic3Partition: 4Leader: 5Replicas: 5,1,4Isr: > 5,1,4 > > > > And here is the producer test run command line and the result: > > $ bin/kafka-producer-perf-test.sh --topic testtopic3 --num-records 100 > --throughput -1 --record-size 256 --producer-props > bootstrap.servers=kafka-a-0.ri:9092,kafka-b-0.ri:9092,kafka-c-0.ri:9092,kafka-d-0.ri:9092,kafka-e-0.ri:9092 > acks=all batch.size=1 max.block.ms > <https://urldefense.proofpoint.com/v2/url?u=https-3A__slack-2Dredir.net_link-3Furl-3Dhttp-253A-252F-252Fmax.block.ms&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI&m=bcvrB2WHGYyMkDxuOiAX_kDm8dM_Gnhn9co-80eQpeM&s=XP3cGgt3rD1lr8r0diTLpP4FP3BwZ8spcHNDNXc3i6I&e= > >=360 enable.idempotence=true max.in.flight.requests.per.connection=1 > retries=3 --transaction-duration-ms 360 > 4100 records sent, 819.7 records/sec (0.20 MB/sec), 2572.0 ms avg latency, > 4892.0 ms max latency. > 4261 records sent, 852.0 records/sec (0.21 MB/sec), 7397.2 ms avg latency, > 9873.0 ms max latency. > 4216 records sent, 843.0 records/sec (0.21 MB/sec), 12383.7 ms avg latency, > 14849.0 ms max latency. > 4400 records sent, 879.8 records/sec (0.21 MB/sec), 17332.0 ms avg latency, > 19784.0 ms max latency. > 4354 records sent, 870.8 records/sec (0.21 MB/sec), 22349.4 ms avg latency, > 24763.0 ms max latency. > 4477 records sent, 895.4 records/sec (0.22 MB/sec), 27241.1 ms avg latency, > 29728.0 ms max latency. > 4366 records sent, 873.2 records/sec (0.21 MB/sec), 32218.3 ms avg latency, > 34703.0 ms max latency. > 4408 records sent, 881.6 records/sec (0.22 MB/sec), 37190.6 ms avg latency, > 39672.0 ms max latency. > 4159 records sent, 831.5 records/sec (0.20 MB/sec), 42135.0 ms avg latency, > 44640.0 ms max latency. > 4260 records sent, 852.0 records/sec (0.21 MB/sec), 47098.0 ms avg latency, > 49624.0 ms max latency. > 4360 records sent, 872.0 records/sec (0.21 MB/sec), 52137.1 ms avg latency, > 54574.0 ms max latency. > 4514 records sent, 902.8 records/sec (0.22 MB/sec), 57038.1 ms avg latency, > 59554.0 ms max latency. > 4273 records sent, 854.3 records/sec (0.21 MB/sec), 62001.8 ms avg latency, > 64524.0 ms max latency. > 4348 records sent, 869.6 records/sec (0.21 MB/sec), 67037.8 ms avg latency, > 69494.0 ms max latency. > 4039 records sent, 807.5 records/sec (0.20 MB/sec), 72009.8 ms avg latency, > 74481.0 ms max latency. > 4327 records sent, 865.2 records/sec (0.21 MB/sec), 76993.8 ms avg latency, > 79457.0 ms max latency. > 4307 records sent, 861.4 records/sec (0.21 MB/sec), 82011.9 ms avg latency, > 84449.0 ms max latency. > 4506 records sent, 901.0 records/sec (0.22 MB/sec), 86922.6 ms avg latency, > 89434.0 ms max latency. > 4343 records sent, 868.6 records/sec (0.21 MB/sec), 91918.8 ms avg latency, > 94394.0 ms max latency. > org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an > operation with an old epoch. Either there
Transaction error in Kafka producer perf test
We are evaluating Kafka for some of our use cases. As part of that effort I am trying to run an experiment with a cluster we have set up, and using the producer perf test tool supplied with the binaries. Here’s the cluster info: Runs in Kubernetes, with 4 CPUs, 32 GB RAM, 100 GB log space allocation for each node. 3 ZooKeeper nodes 5 Kafka nodes Here is the topic description: $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic testtopic3 Topic:testtopic3PartitionCount:5ReplicationFactor:3 Configs:min.insync.replicas=2,segment.bytes=1073741824,retention.ms <https://slack-redir.net/link?url=http%3A%2F%2Fretention.ms>=360,flush.messages=1,unclean.leader.election.enable=false Topic: testtopic3Partition: 0Leader: 1Replicas: 1,4,2Isr: 1,4,2 Topic: testtopic3Partition: 1Leader: 4Replicas: 4,2,3Isr: 4,2,3 Topic: testtopic3Partition: 2Leader: 2Replicas: 2,3,5Isr: 2,3,5 Topic: testtopic3Partition: 3Leader: 3Replicas: 3,5,1Isr: 3,5,1 Topic: testtopic3Partition: 4Leader: 5Replicas: 5,1,4Isr: 5,1,4 And here is the producer test run command line and the result: $ bin/kafka-producer-perf-test.sh --topic testtopic3 --num-records 100 --throughput -1 --record-size 256 --producer-props bootstrap.servers=kafka-a-0.ri:9092,kafka-b-0.ri:9092,kafka-c-0.ri:9092,kafka-d-0.ri:9092,kafka-e-0.ri:9092 acks=all batch.size=1 max.block.ms <https://slack-redir.net/link?url=http%3A%2F%2Fmax.block.ms>=360 enable.idempotence=true max.in.flight.requests.per.connection=1 retries=3 --transaction-duration-ms 360 4100 records sent, 819.7 records/sec (0.20 MB/sec), 2572.0 ms avg latency, 4892.0 ms max latency. 4261 records sent, 852.0 records/sec (0.21 MB/sec), 7397.2 ms avg latency, 9873.0 ms max latency. 4216 records sent, 843.0 records/sec (0.21 MB/sec), 12383.7 ms avg latency, 14849.0 ms max latency. 4400 records sent, 879.8 records/sec (0.21 MB/sec), 17332.0 ms avg latency, 19784.0 ms max latency. 4354 records sent, 870.8 records/sec (0.21 MB/sec), 22349.4 ms avg latency, 24763.0 ms max latency. 4477 records sent, 895.4 records/sec (0.22 MB/sec), 27241.1 ms avg latency, 29728.0 ms max latency. 4366 records sent, 873.2 records/sec (0.21 MB/sec), 32218.3 ms avg latency, 34703.0 ms max latency. 4408 records sent, 881.6 records/sec (0.22 MB/sec), 37190.6 ms avg latency, 39672.0 ms max latency. 4159 records sent, 831.5 records/sec (0.20 MB/sec), 42135.0 ms avg latency, 44640.0 ms max latency. 4260 records sent, 852.0 records/sec (0.21 MB/sec), 47098.0 ms avg latency, 49624.0 ms max latency. 4360 records sent, 872.0 records/sec (0.21 MB/sec), 52137.1 ms avg latency, 54574.0 ms max latency. 4514 records sent, 902.8 records/sec (0.22 MB/sec), 57038.1 ms avg latency, 59554.0 ms max latency. 4273 records sent, 854.3 records/sec (0.21 MB/sec), 62001.8 ms avg latency, 64524.0 ms max latency. 4348 records sent, 869.6 records/sec (0.21 MB/sec), 67037.8 ms avg latency, 69494.0 ms max latency. 4039 records sent, 807.5 records/sec (0.20 MB/sec), 72009.8 ms avg latency, 74481.0 ms max latency. 4327 records sent, 865.2 records/sec (0.21 MB/sec), 76993.8 ms avg latency, 79457.0 ms max latency. 4307 records sent, 861.4 records/sec (0.21 MB/sec), 82011.9 ms avg latency, 84449.0 ms max latency. 4506 records sent, 901.0 records/sec (0.22 MB/sec), 86922.6 ms avg latency, 89434.0 ms max latency. 4343 records sent, 868.6 records/sec (0.21 MB/sec), 91918.8 ms avg latency, 94394.0 ms max latency. org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. Exception in thread "main" org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:357) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:341) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856) at org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:143) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. [2019-10-25 21:15:05,183] ERROR [Producer clientId=producer-1, transactionalId=performance-producer-default-transactional-id] Aborting producer batches due to fatal error (org.apache.kafka.c
Different Results from Kafka's OoB Producer Test tool
Hello, We are trying to test our GCP 3 node cluster based on some testing done previously here https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing We have seen some inconsistencies in results. With 2.3.0 we are running with the following producer params(single-threaded) enable.idempotence=true batch.size=1 acks=1 Our payload size is 100K and total 1000 messages. We are always getting record expiration. So, we are slightly confused about what this perf test tool is actually doing. With the above configuration, there shouldn't be any record that will expire. The network has a latency, but that is nowhere near 30s (default request timeout). For example, this is what we get after our perf test. 1000 records sent, 2.772994 records/sec (0.26 MB/sec), 113187.14 ms avg latency, 123025.00 ms max latency, 120032 ms 50th, 121516 ms 95th, 122262 ms 99th, 123025 ms 99.9th. The rest of our producer configs are default OoB. Could anyone explain if this is reasonable/expected, or something we've misunderstood? Thanks,
How i can run standalone Network Server Test.
I am new to using Kafka and wondering if any of experts out there can let me know how i can run a standalone Network Unit test for Kafka Server. I see this class SocketServerTest in Unit tests for Networking but not able to figure out the commands to use it or i need to write any test program on top. I would like to test my Network Performance in isolation. Appreciate any help in this.
Re: Jenkins Integration Test Failed (https://github.com/apache/kafka/pull/6771)
Huh! Never realised our ASF has bots working behind the scene. Thanks Matthias :) On Wed, 22 May 2019 at 12:27, Matthias J. Sax wrote: > Just leave a commend > > Retest this please. > > on Github and it will trigger a retest. No special permission required. > > > -Matthias > > > On 5/22/19 1:10 PM, M. Manna wrote: > > Hello, > > > > One of the integration tests failed due to a timeout issue. > > > > https://github.com/apache/kafka/pull/6771 > > > > Could someone with access please re-trigger the build ? This didn't fail > > for me when I checked previously. > > > > Thanks, > > > >
Re: Jenkins Integration Test Failed (https://github.com/apache/kafka/pull/6771)
Just leave a commend Retest this please. on Github and it will trigger a retest. No special permission required. -Matthias On 5/22/19 1:10 PM, M. Manna wrote: > Hello, > > One of the integration tests failed due to a timeout issue. > > https://github.com/apache/kafka/pull/6771 > > Could someone with access please re-trigger the build ? This didn't fail > for me when I checked previously. > > Thanks, > signature.asc Description: OpenPGP digital signature
Jenkins Integration Test Failed (https://github.com/apache/kafka/pull/6771)
Hello, One of the integration tests failed due to a timeout issue. https://github.com/apache/kafka/pull/6771 Could someone with access please re-trigger the build ? This didn't fail for me when I checked previously. Thanks,
Re: Performance Testing Using Consumer-Perf-Test
1) Are all 10 publishers producing to the same topic? What level of ACKs do you have set? How many partitions are in your topic? Are all 10 consumers in the same consumer group or are they supposed to be independent consumers that each get the full set of messages published? 2) Depends what you are measuring (latency, throughput, or something else)? If you publish first then your consumer has to consume either from the beginning or in real-time and will only get messages published AFTER it successfully subscribes. With 10 consumers you could generate a lot of rebalancing if you don't start and balance them ahead of time. -hans On Wed, May 15, 2019 at 8:45 AM M. Manna wrote: > Hello, > > I am trying to do some performance testing using Kafka-Consumer-Perf-Test. > Could somone please help me understand whether my setup is correct? > > 1) I would like to run a benchmark test to have 10 publishers publishing > 100 messages (4MB) each and 10 subscribers. > > 2) For the above, do I need to run PRoducer first and then Consumer? Or, is > it okay just to run consumer-perf-test ? > > Thanks, >
Performance Testing Using Consumer-Perf-Test
Hello, I am trying to do some performance testing using Kafka-Consumer-Perf-Test. Could somone please help me understand whether my setup is correct? 1) I would like to run a benchmark test to have 10 publishers publishing 100 messages (4MB) each and 10 subscribers. 2) For the above, do I need to run PRoducer first and then Consumer? Or, is it okay just to run consumer-perf-test ? Thanks,
Re: kafka test issue
In Kafka 2.1, the `Serializer` and `Deserializer` interface were changed: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87298242 It seems, that updating to 2.2.0 for the test driver does not update your dependency to `kafka-client` package. Try to add this dependency explicitly to make sure its using `2.2.0` clients jar. -Matthias On 4/13/19 12:49 PM, Chad Preisler wrote: > It seems like the Kafka test driver is not compatible with the latest > kafka-streams API. When I use the following two dependencies together. > > >org.apache.kafka >kafka-streams-test-utils > 2.2.0 >test > > >org.apache.kafka >kafka-streams >2.2.0 >compile > > > > I get the following compiler error: > > java.lang.NoSuchMethodError: > org.apache.kafka.common.serialization.Serializer.serialize(Ljava/lang/String;Lorg/apache/kafka/common/header/Headers;Ljava/lang/Object;)[B > at > > > In my unit test file for this function call: > > testDriver.pipeInput(statusFactory.create(TopicNames.ENVELOPE_STATUS, > "1000", >new Status("FIN", "1000", "d200", EnvelopeState.DRAFT, new > Date().getTime(), "draft", null, null))); > > > Anyone run into this problem? Changing to version 2.0.1 or Kafka Streams > makes everything work. > signature.asc Description: OpenPGP digital signature
kafka test issue
It seems like the Kafka test driver is not compatible with the latest kafka-streams API. When I use the following two dependencies together. org.apache.kafka kafka-streams-test-utils 2.2.0 test org.apache.kafka kafka-streams 2.2.0 compile I get the following compiler error: java.lang.NoSuchMethodError: org.apache.kafka.common.serialization.Serializer.serialize(Ljava/lang/String;Lorg/apache/kafka/common/header/Headers;Ljava/lang/Object;)[B at In my unit test file for this function call: testDriver.pipeInput(statusFactory.create(TopicNames.ENVELOPE_STATUS, "1000", new Status("FIN", "1000", "d200", EnvelopeState.DRAFT, new Date().getTime(), "draft", null, null))); Anyone run into this problem? Changing to version 2.0.1 or Kafka Streams makes everything work.
Kakfa embedded cluster rebalance scenario test
Hi , I would like to add a integration test for kafka rebalance scenario and make sure retries are working as expected if enough replicas are not there in my kafka streams application . Do you have any info on how can i achieve this . I was checking TestUtils class in org.apache.kafka.test, but could not figure out a way on how to trigger a rebalance or NotLeaderForPartitionException scenario using a embedded kafka cluster Thanks Pradeep
Re: Errors observed in performance test using kafka-producer-perf-test.sh|
I have another very disturbing observation. The errors go away if I start 2 kafka-producer-perf-test.sh with the same configs on different hosts. If I cancel 1 kafka-producer-perf-test.sh then after some time the below errors start reappearing. org.apache.kafka.common.errors.TimeoutException: The request timed out. org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for benchmark-6-3r-2isr-none-0: 182806 ms has passed since last append --Unilocal On Wed, May 30, 2018 at 1:19 AM Localhost shell < universal.localh...@gmail.com> wrote: > Hello All, > > I am trying to perform a benchmark test in our kafka env. I have played > with few configurations such as request.timeout.ms and max.block.ms and > throughout but not able to avoid the error: > org.apache.kafka.common.errors.TimeoutException: The request timed out. > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received. > org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) > for benchmark-6-3r-2isr-none-0: 182806 ms has passed since last append > > Produce Perf Test command: > nohup sh ~/kafka/kafka_2.11-1.0.0/bin/kafka-producer-perf-test.sh --topic > benchmark-6p-3r-2isr-none --num-records 1000 --record-size 100 > --throughput -1 --print-metrics --producer-props acks=all > bootstrap.servers=node1:9092,node2:9092,node3:9092 request.timeout.ms=18 > max.block.ms=18 buffer.memory=1 > > ~/kafka/load_test/results/6p-3r-10M-100B-t-1-ackall-rto3m-block2m-bm100m-2 > 2>&1 > > Cluster: 3 nodes, topic: 6 partitions, RF=3 and minISR=2 > I am monitoring the kafka metrics using a tsdb and grafana. I know that > disk IO perf is bad [disk await(1.5 secs), IO queue size and disk > utilization metrics are high(60-75%)] but I don't see any issue in kafka > logs that can relate slow disk io to the above perf errors. > > I have even run the test with throughput=1000(all above params same) but > still get timeout exceptions. > > Need suggestions to understand the issue and fix the above errors? > > --Unilocal > -- --Unilocal
Errors observed in performance test using kafka-producer-perf-test.sh|
Hello All, I am trying to perform a benchmark test in our kafka env. I have played with few configurations such as request.timeout.ms and max.block.ms and throughout but not able to avoid the error: org.apache.kafka.common.errors.TimeoutException: The request timed out. org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for benchmark-6-3r-2isr-none-0: 182806 ms has passed since last append Produce Perf Test command: nohup sh ~/kafka/kafka_2.11-1.0.0/bin/kafka-producer-perf-test.sh --topic benchmark-6p-3r-2isr-none --num-records 1000 --record-size 100 --throughput -1 --print-metrics --producer-props acks=all bootstrap.servers=node1:9092,node2:9092,node3:9092 request.timeout.ms=18 max.block.ms=18 buffer.memory=1 > ~/kafka/load_test/results/6p-3r-10M-100B-t-1-ackall-rto3m-block2m-bm100m-2 2>&1 Cluster: 3 nodes, topic: 6 partitions, RF=3 and minISR=2 I am monitoring the kafka metrics using a tsdb and grafana. I know that disk IO perf is bad [disk await(1.5 secs), IO queue size and disk utilization metrics are high(60-75%)] but I don't see any issue in kafka logs that can relate slow disk io to the above perf errors. I have even run the test with throughput=1000(all above params same) but still get timeout exceptions. Need suggestions to understand the issue and fix the above errors? --Unilocal
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
Sweet! I think this pretty much wraps up all the discussion points. I'll update the KIP with all the relevant aspects we discussed and call for a vote. I'll also comment on the TopologyTestDriver ticket noting this modular test strategy. Thanks, everyone. -John On Fri, Mar 9, 2018 at 10:57 AM, Guozhang Wang wrote: > Hey John, > > Re: Mock Processor Context: > > That's a good point, I'm convinced that we should keep them as two classes. > > > Re: test-utils module: > > I think I agree with your proposed changes, in fact in order to not scatter > the test classes in two places maybe it's better to move all of them to the > new module. One caveat is that it will make streams' project hierarchy > inconsistent with other projects where the unit test classes are maintained > inside the main artifact package, but I think it is a good cost to pay, > plus once we start publishing test-util artifacts for other projects like > client and connect, we may face the same issue and need to do this > refactoring as well. > > > > Guozhang > > > > > On Fri, Mar 9, 2018 at 9:54 AM, John Roesler wrote: > > > Hi Guozhang and Bill, > > > > I'll summarize what I'm currently thinking in light of all the > discussion: > > > > Mock Processor Context: > > === > > > > Here's how I see the use cases for the two mocks differing: > > > > 1. o.a.k.test.MPC: Crafted for testing Streams use cases. Implements > > AbstractProcessorContext, actually forward to child processor nodes, > allow > > restoring a state store. Most importantly, the freedom to do stuff > > convenient for our tests without impacting anyone. > > > > 2. (test-utils) MPC: Crafted for testing community Processors (and > > friends). Very flat and simple implementation (so people can read it in > one > > sitting); i.e., doesn't drag in other data models like RecordContext. > Test > > one processor in isolation, so generally don't bother with complex logic > > like scheduling punctuators, forwarding results, or restoring state > stores. > > Most importantly, an API that can be stable. > > > > So, I really am leaning toward keeping both implementations. I like > Bill's > > suggestion of renaming the unit testing class to > > InternalMockProcessorContext, since having classes with the same name in > > different packages is confusing. I look forward to the day when Java 9 > > takes off and we can actually hide internal classes from the public > > interface. > > > > test-utils module: > > = > > > > This is actually out of scope for this KIP if we keep both MPC > > implementations, but it has been a major feature of this discussion, so > we > > may as well see it though. > > > > I've waffled a bit on this point, but right now I would propose we > > restructure the streams directory thusly: > > > > streams/ (artifact name := "streams", the actual streams code lives here) > > - test-utils/ (this is the current test-utils artifact, depends on > > "streams") > > - tests/ (new module, depends on "streams" and "test-utils", *NO > published > > artifact*) > > > > This gets us out of the circular dependency without having to engage in > any > > Gradle shenanigans while preserving "test-utils" as a separate artifact. > > This is good because: 1) the test-utils don't need to be in production > > code, so it's nice to have a separate artifact, 2) test-utils is already > > public in 1.1, and it's a bummer to introduce users' code when we can so > > easily avoid it. > > > > Note, though, that if we agree to keep both MPC implementations, then > this > > really is just important for rewriting our tests to use > TopologyTestDriver, > > and in fact only the tests that need it should move to "streams/tests/". > > > > What say you? > > > > -John > > > > On Fri, Mar 9, 2018 at 9:01 AM, Guozhang Wang > wrote: > > > > > Hmm.. it seems to be a general issue then, since we were planning to > also > > > replace the KStreamTestDriver and ProcessorTopologyTestDriver with the > > new > > > TopologyTestDriver soon, so if the argument that testing dependency > could > > > still cause circular dependencies holds it means we cannot do that as > > well. > > > > > > My understanding on gradle dependencies has been that test dependencies > > are > > > not required to comp
Unit Test with Compacted Topics having non consecutive offsets
Hi, I need to write unit tests with Compacted Topics in local cluster. Has anyone done something like that? Any tips/guidance will be much appreciated. Thanks Sirisha
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
John, Sorry for the delayed response. Thanks for the KIP, I'm +1 on it, and I don't have any further comments on the KIP itself aside from the comments that others have raised. Regarding the existing MockProcessorContext and its removal in favor of the one added from this KIP, I'm actually in favor of keeping both. IMHO it's reasonable to have both because the testing requirements are different. Most users are trying to verify their logic works as expected within a Kafka Streams application and aren't concerned (or shouldn't be at least, again IMHO) with testing Kafka Streams itself, that is the responsibility of the Kafka Streams developers and contributors. However, for the developers and contributors of Kafka Streams, the need to test the internals of how Streams works is the primary concern and could at times require different logic or available methods from a given mock object. I have a couple of thoughts on mitigation of having two MockProcessorContext objects 1. Leave the current MockProcessorContext in the o.a.k.test package but rename it to InternalMockProcessorContext and add some documentation as to why it's there. 2. Create a new package under o.a.k.test, called internals and move the existing MockProcessorContext there, but that would require a change to the visibility of the MockProcessorContext#allStateStores() method to public. Just wanted to throw in my 2 cents. Thanks, Bill On Thu, Mar 8, 2018 at 11:51 PM, John Roesler wrote: > I think what you're suggesting is to: > 1. compile the main streams code, but not the tests > 2. compile test-utils (and compile and run the test-utils tests) > 3. compile and run the streams tests > > This works in theory, since the test-utils depends on the main streams > code, but not the streams tests. and the streams tests depend on test-utils > while the main streams code does not. > > But after poking around a bit and reading up on it, I think this is not > possible, or at least not mainstream. > > The issue is that dependencies are formed between projects, in this case > streams and streams:test-utils. The upstream project must be built before > the dependant one, regardless of whether the dependency is for compiling > the main code or the test code. This means we do have a circular dependency > on our hands if we want the tests in streams to use the test-utils, since > they'd both have to be built before the other. > > Gradle seems to be quite scriptable, so there may be some way to achieve > this, but increasing the complexity of the build also introduces a project > maintenance concern. > > The MockProcessorContext itself is pretty simple, so I'm tempted to argue > that we should just have one for internal unit tests and another for > test-utils, however this situation also afflicts KAFKA-6474 > <https://issues.apache.org/jira/browse/KAFKA-6474>, and the > TopologyTestDriver is not so trivial. > > I think the best thing at this point is to go ahead and fold the test-utils > into the streams project. We can put it into a separate "testutils" package > to make it easy to identify which code is for test support and which code > is Kafka Streams. The biggest bummer about this suggestion is that it we > *just* introduced the test-utils artifact, so folks would to add that > artifact in 1.1 to write their tests and then have to drop it again in 1.2. > > The other major solution is to create a new gradle project for the streams > unit tests, which depends on streams and test-utils and move all the > streams unit tests there. I'm pretty sure we can configure gradle just to > include this project for running tests and not actually package any > artifacts. This structure basically expresses your observation that the > test code is essentially a separate module from the main streams code. > > Of course, I'm open to alternatives, especially if someone with more > experience in Gradle is aware of a solution. > > Thanks, > -John > > > On Thu, Mar 8, 2018 at 3:39 PM, Matthias J. Sax > wrote: > > > Isn't MockProcessorContext in o.a.k.test part of the unit-test package > > but not the main package? > > > > This should resolve the dependency issue. > > > > -Matthias > > > > On 3/8/18 3:32 PM, John Roesler wrote: > > > Actually, replacing the MockProcessorContext in o.a.k.test could be a > bit > > > tricky, since it would make the "streams" module depend on > > > "streams:test-utils", but "streams:test-utils" already depends on > > "streams". > > > > > > At first glance, it seems like the options are: > > > 1. leave the two separate implementa
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
I think what you're suggesting is to: 1. compile the main streams code, but not the tests 2. compile test-utils (and compile and run the test-utils tests) 3. compile and run the streams tests This works in theory, since the test-utils depends on the main streams code, but not the streams tests. and the streams tests depend on test-utils while the main streams code does not. But after poking around a bit and reading up on it, I think this is not possible, or at least not mainstream. The issue is that dependencies are formed between projects, in this case streams and streams:test-utils. The upstream project must be built before the dependant one, regardless of whether the dependency is for compiling the main code or the test code. This means we do have a circular dependency on our hands if we want the tests in streams to use the test-utils, since they'd both have to be built before the other. Gradle seems to be quite scriptable, so there may be some way to achieve this, but increasing the complexity of the build also introduces a project maintenance concern. The MockProcessorContext itself is pretty simple, so I'm tempted to argue that we should just have one for internal unit tests and another for test-utils, however this situation also afflicts KAFKA-6474 <https://issues.apache.org/jira/browse/KAFKA-6474>, and the TopologyTestDriver is not so trivial. I think the best thing at this point is to go ahead and fold the test-utils into the streams project. We can put it into a separate "testutils" package to make it easy to identify which code is for test support and which code is Kafka Streams. The biggest bummer about this suggestion is that it we *just* introduced the test-utils artifact, so folks would to add that artifact in 1.1 to write their tests and then have to drop it again in 1.2. The other major solution is to create a new gradle project for the streams unit tests, which depends on streams and test-utils and move all the streams unit tests there. I'm pretty sure we can configure gradle just to include this project for running tests and not actually package any artifacts. This structure basically expresses your observation that the test code is essentially a separate module from the main streams code. Of course, I'm open to alternatives, especially if someone with more experience in Gradle is aware of a solution. Thanks, -John On Thu, Mar 8, 2018 at 3:39 PM, Matthias J. Sax wrote: > Isn't MockProcessorContext in o.a.k.test part of the unit-test package > but not the main package? > > This should resolve the dependency issue. > > -Matthias > > On 3/8/18 3:32 PM, John Roesler wrote: > > Actually, replacing the MockProcessorContext in o.a.k.test could be a bit > > tricky, since it would make the "streams" module depend on > > "streams:test-utils", but "streams:test-utils" already depends on > "streams". > > > > At first glance, it seems like the options are: > > 1. leave the two separate implementations in place. This shouldn't be > > underestimated, especially since our internal tests may need different > > things from a mocked P.C. than our API users. > > 2. move the public testing artifacts into the regular streams module > > 3. move the unit tests for Streams into a third module that depends on > both > > streams and test-utils. Yuck! > > > > Thanks, > > -John > > > > On Thu, Mar 8, 2018 at 3:16 PM, John Roesler wrote: > > > >> Thanks for the review, Guozhang, > >> > >> In response: > >> 1. I missed that! I'll look into it and update the KIP. > >> > >> 2. I was planning to use the real implementation, since folks might > >> register some metrics in the processors and want to verify the values > that > >> get recorded. If the concern is about initializing all the stuff that's > in > >> the Metrics object, I can instantiate it lazily or even make it > optional by > >> taking a nullable constructor parameter. > >> > >> 3. Agreed. I think that's the real sharp edge here. I actually think it > >> would be neat to auto-trigger those scheduled punctuators, but it seems > >> like that moves this component out of "mock" territory and into "driver" > >> territory. Since we already have the TopologyTestDriver, I'd prefer to > >> focus on keeping the mock lean. I agree it should be in the javadoc as > well > >> as the web documentation. > >> > >> Thanks, > >> -John > >> > >> On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang > wrote: > >> > >>> Hello John, > >>> > >>> Thanks for the KIP.
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
MockProcessorContext is only used in unit tests, and hence we should be able to declare it as a test dependency of `streams` in gradle build file, which is OK. Guozhang On Thu, Mar 8, 2018 at 3:32 PM, John Roesler wrote: > Actually, replacing the MockProcessorContext in o.a.k.test could be a bit > tricky, since it would make the "streams" module depend on > "streams:test-utils", but "streams:test-utils" already depends on > "streams". > > At first glance, it seems like the options are: > 1. leave the two separate implementations in place. This shouldn't be > underestimated, especially since our internal tests may need different > things from a mocked P.C. than our API users. > 2. move the public testing artifacts into the regular streams module > 3. move the unit tests for Streams into a third module that depends on both > streams and test-utils. Yuck! > > Thanks, > -John > > On Thu, Mar 8, 2018 at 3:16 PM, John Roesler wrote: > > > Thanks for the review, Guozhang, > > > > In response: > > 1. I missed that! I'll look into it and update the KIP. > > > > 2. I was planning to use the real implementation, since folks might > > register some metrics in the processors and want to verify the values > that > > get recorded. If the concern is about initializing all the stuff that's > in > > the Metrics object, I can instantiate it lazily or even make it optional > by > > taking a nullable constructor parameter. > > > > 3. Agreed. I think that's the real sharp edge here. I actually think it > > would be neat to auto-trigger those scheduled punctuators, but it seems > > like that moves this component out of "mock" territory and into "driver" > > territory. Since we already have the TopologyTestDriver, I'd prefer to > > focus on keeping the mock lean. I agree it should be in the javadoc as > well > > as the web documentation. > > > > Thanks, > > -John > > > > On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang > wrote: > > > >> Hello John, > >> > >> Thanks for the KIP. I made a pass over the wiki page and here are some > >> comments: > >> > >> 1. Meta-comment: there is an internal class MockProcessorContext under > the > >> o.a.k.test package, which should be replaced as part of this KIP. > >> > >> 2. In @Override StreamsMetrics metrics(), will you return a fully > created > >> StreamsMetricsImpl object or are you planning to use the > >> MockStreamsMetrics? Note that for the latter case you probably need to > >> look > >> into https://issues.apache.org/jira/browse/KAFKA-5676 as well. > >> > >> 3. Not related to the KIP changes themselves: about > >> "context.scheduledPunctuators": we need to well document that in the > >> MockProcessorContext the scheduled punctuator will never by > >> auto-triggered, > >> and hence it is only for testing people's code that some punctuators are > >> indeed registered, and if people want full auto punctuation testing they > >> have to go with TopologyTestDriver. > >> > >> > >> > >> Guozhang > >> > >> > >> On Wed, Mar 7, 2018 at 8:04 PM, John Roesler wrote: > >> > >> > On Wed, Mar 7, 2018 at 8:03 PM, John Roesler > wrote: > >> > > >> > > Thanks Ted, > >> > > > >> > > Sure thing; I updated the example code in the KIP with a little > >> snippet. > >> > > > >> > > -John > >> > > > >> > > On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu wrote: > >> > > > >> > >> Looks good. > >> > >> > >> > >> See if you can add punctuator into the sample code. > >> > >> > >> > >> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler > >> wrote: > >> > >> > >> > >> > Dear Kafka community, > >> > >> > > >> > >> > I am proposing KIP-267 to augment the public Streams test utils > >> API. > >> > >> > The goal is to simplify testing of Kafka Streams applications. > >> > >> > > >> > >> > Please find details in the > >> > >> > wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> > >> > 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+ > Test+Utils > >> > >> > > >> > >> > An initial WIP PR can be found here:https://github.com/ > >> > >> > apache/kafka/pull/4662 > >> > >> > > >> > >> > I also included the user-list (please hit "reply-all" to include > >> both > >> > >> > lists in this KIP discussion). > >> > >> > > >> > >> > Thanks, > >> > >> > > >> > >> > -John > >> > >> > > >> > >> > >> > > > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > > -- -- Guozhang
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
Thanks, Matthias, 1. I can move it into the o.a.k.streams.processor package; that makes sense. 2. I'm expecting most users to use in-memory state stores, so they won't need a state directory. In the "real" code path, the stateDir is extracted from the config by org.apache.kafka.streams.processor.internals.StateDirectory. The logic is non-trivial and invoking it directly will result in the state directory actually being created. Given my assumption that you don't need it most of the time, creating directories seems too heavy to me. 3. I'm on the fence about that. It's not too much trouble to implement it, even if it is deprecated from day 1, so I think I'd rather put it in and let us remove it later when we actually remove the deprecated method. In contrast, we actually would have to jump through some hoops to support schedule(interval). On Thu, Mar 8, 2018 at 3:36 PM, Matthias J. Sax wrote: > Thanks for the KIP John. > > Couple of minor questions: > > - What about putting the mock into sub-package `processor` so it's in > the same package name as the interface it implements? > > - What is the purpose of the constructor talking the `File stateDir` > argument? The state directory should be encoded in the `Properties > config' parameter already. > > - We have KIP-251 that place (not voted yet though) that plans to > deprecate `forward(K key, V value, int childIndex)` and `forward(K key, > V value, String childName)` -- should we also throw > UnsupportedOperationException similar to `schedule(long)` if KIP-251 is > accepted? > > > -Matthias > > On 3/8/18 3:16 PM, John Roesler wrote: > > Thanks for the review, Guozhang, > > > > In response: > > 1. I missed that! I'll look into it and update the KIP. > > > > 2. I was planning to use the real implementation, since folks might > > register some metrics in the processors and want to verify the values > that > > get recorded. If the concern is about initializing all the stuff that's > in > > the Metrics object, I can instantiate it lazily or even make it optional > by > > taking a nullable constructor parameter. > > > > 3. Agreed. I think that's the real sharp edge here. I actually think it > > would be neat to auto-trigger those scheduled punctuators, but it seems > > like that moves this component out of "mock" territory and into "driver" > > territory. Since we already have the TopologyTestDriver, I'd prefer to > > focus on keeping the mock lean. I agree it should be in the javadoc as > well > > as the web documentation. > > > > Thanks, > > -John > > > > On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang > wrote: > > > >> Hello John, > >> > >> Thanks for the KIP. I made a pass over the wiki page and here are some > >> comments: > >> > >> 1. Meta-comment: there is an internal class MockProcessorContext under > the > >> o.a.k.test package, which should be replaced as part of this KIP. > >> > >> 2. In @Override StreamsMetrics metrics(), will you return a fully > created > >> StreamsMetricsImpl object or are you planning to use the > >> MockStreamsMetrics? Note that for the latter case you probably need to > look > >> into https://issues.apache.org/jira/browse/KAFKA-5676 as well. > >> > >> 3. Not related to the KIP changes themselves: about > >> "context.scheduledPunctuators": we need to well document that in the > >> MockProcessorContext the scheduled punctuator will never by > auto-triggered, > >> and hence it is only for testing people's code that some punctuators are > >> indeed registered, and if people want full auto punctuation testing they > >> have to go with TopologyTestDriver. > >> > >> > >> > >> Guozhang > >> > >> > >> On Wed, Mar 7, 2018 at 8:04 PM, John Roesler wrote: > >> > >>> On Wed, Mar 7, 2018 at 8:03 PM, John Roesler > wrote: > >>> > >>>> Thanks Ted, > >>>> > >>>> Sure thing; I updated the example code in the KIP with a little > >> snippet. > >>>> > >>>> -John > >>>> > >>>> On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu wrote: > >>>> > >>>>> Looks good. > >>>>> > >>>>> See if you can add punctuator into the sample code. > >>>>> > >>>>> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler > >> wrote: > >>>>> > >>>>>> Dear Kafka community, > >>>>>> > >>>>>> I am proposing KIP-267 to augment the public Streams test utils API. > >>>>>> The goal is to simplify testing of Kafka Streams applications. > >>>>>> > >>>>>> Please find details in the > >>>>>> wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>> 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils > >>>>>> > >>>>>> An initial WIP PR can be found here:https://github.com/ > >>>>>> apache/kafka/pull/4662 > >>>>>> > >>>>>> I also included the user-list (please hit "reply-all" to include > >> both > >>>>>> lists in this KIP discussion). > >>>>>> > >>>>>> Thanks, > >>>>>> > >>>>>> -John > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > >
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
Isn't MockProcessorContext in o.a.k.test part of the unit-test package but not the main package? This should resolve the dependency issue. -Matthias On 3/8/18 3:32 PM, John Roesler wrote: > Actually, replacing the MockProcessorContext in o.a.k.test could be a bit > tricky, since it would make the "streams" module depend on > "streams:test-utils", but "streams:test-utils" already depends on "streams". > > At first glance, it seems like the options are: > 1. leave the two separate implementations in place. This shouldn't be > underestimated, especially since our internal tests may need different > things from a mocked P.C. than our API users. > 2. move the public testing artifacts into the regular streams module > 3. move the unit tests for Streams into a third module that depends on both > streams and test-utils. Yuck! > > Thanks, > -John > > On Thu, Mar 8, 2018 at 3:16 PM, John Roesler wrote: > >> Thanks for the review, Guozhang, >> >> In response: >> 1. I missed that! I'll look into it and update the KIP. >> >> 2. I was planning to use the real implementation, since folks might >> register some metrics in the processors and want to verify the values that >> get recorded. If the concern is about initializing all the stuff that's in >> the Metrics object, I can instantiate it lazily or even make it optional by >> taking a nullable constructor parameter. >> >> 3. Agreed. I think that's the real sharp edge here. I actually think it >> would be neat to auto-trigger those scheduled punctuators, but it seems >> like that moves this component out of "mock" territory and into "driver" >> territory. Since we already have the TopologyTestDriver, I'd prefer to >> focus on keeping the mock lean. I agree it should be in the javadoc as well >> as the web documentation. >> >> Thanks, >> -John >> >> On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang wrote: >> >>> Hello John, >>> >>> Thanks for the KIP. I made a pass over the wiki page and here are some >>> comments: >>> >>> 1. Meta-comment: there is an internal class MockProcessorContext under the >>> o.a.k.test package, which should be replaced as part of this KIP. >>> >>> 2. In @Override StreamsMetrics metrics(), will you return a fully created >>> StreamsMetricsImpl object or are you planning to use the >>> MockStreamsMetrics? Note that for the latter case you probably need to >>> look >>> into https://issues.apache.org/jira/browse/KAFKA-5676 as well. >>> >>> 3. Not related to the KIP changes themselves: about >>> "context.scheduledPunctuators": we need to well document that in the >>> MockProcessorContext the scheduled punctuator will never by >>> auto-triggered, >>> and hence it is only for testing people's code that some punctuators are >>> indeed registered, and if people want full auto punctuation testing they >>> have to go with TopologyTestDriver. >>> >>> >>> >>> Guozhang >>> >>> >>> On Wed, Mar 7, 2018 at 8:04 PM, John Roesler wrote: >>> >>>> On Wed, Mar 7, 2018 at 8:03 PM, John Roesler wrote: >>>> >>>>> Thanks Ted, >>>>> >>>>> Sure thing; I updated the example code in the KIP with a little >>> snippet. >>>>> >>>>> -John >>>>> >>>>> On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu wrote: >>>>> >>>>>> Looks good. >>>>>> >>>>>> See if you can add punctuator into the sample code. >>>>>> >>>>>> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler >>> wrote: >>>>>> >>>>>>> Dear Kafka community, >>>>>>> >>>>>>> I am proposing KIP-267 to augment the public Streams test utils >>> API. >>>>>>> The goal is to simplify testing of Kafka Streams applications. >>>>>>> >>>>>>> Please find details in the >>>>>>> wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>> 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils >>>>>>> >>>>>>> An initial WIP PR can be found here:https://github.com/ >>>>>>> apache/kafka/pull/4662 >>>>>>> >>>>>>> I also included the user-list (please hit "reply-all" to include >>> both >>>>>>> lists in this KIP discussion). >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> -John >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >>> >>> -- >>> -- Guozhang >>> >> >> > signature.asc Description: OpenPGP digital signature
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
Thanks for the KIP John. Couple of minor questions: - What about putting the mock into sub-package `processor` so it's in the same package name as the interface it implements? - What is the purpose of the constructor talking the `File stateDir` argument? The state directory should be encoded in the `Properties config' parameter already. - We have KIP-251 that place (not voted yet though) that plans to deprecate `forward(K key, V value, int childIndex)` and `forward(K key, V value, String childName)` -- should we also throw UnsupportedOperationException similar to `schedule(long)` if KIP-251 is accepted? -Matthias On 3/8/18 3:16 PM, John Roesler wrote: > Thanks for the review, Guozhang, > > In response: > 1. I missed that! I'll look into it and update the KIP. > > 2. I was planning to use the real implementation, since folks might > register some metrics in the processors and want to verify the values that > get recorded. If the concern is about initializing all the stuff that's in > the Metrics object, I can instantiate it lazily or even make it optional by > taking a nullable constructor parameter. > > 3. Agreed. I think that's the real sharp edge here. I actually think it > would be neat to auto-trigger those scheduled punctuators, but it seems > like that moves this component out of "mock" territory and into "driver" > territory. Since we already have the TopologyTestDriver, I'd prefer to > focus on keeping the mock lean. I agree it should be in the javadoc as well > as the web documentation. > > Thanks, > -John > > On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang wrote: > >> Hello John, >> >> Thanks for the KIP. I made a pass over the wiki page and here are some >> comments: >> >> 1. Meta-comment: there is an internal class MockProcessorContext under the >> o.a.k.test package, which should be replaced as part of this KIP. >> >> 2. In @Override StreamsMetrics metrics(), will you return a fully created >> StreamsMetricsImpl object or are you planning to use the >> MockStreamsMetrics? Note that for the latter case you probably need to look >> into https://issues.apache.org/jira/browse/KAFKA-5676 as well. >> >> 3. Not related to the KIP changes themselves: about >> "context.scheduledPunctuators": we need to well document that in the >> MockProcessorContext the scheduled punctuator will never by auto-triggered, >> and hence it is only for testing people's code that some punctuators are >> indeed registered, and if people want full auto punctuation testing they >> have to go with TopologyTestDriver. >> >> >> >> Guozhang >> >> >> On Wed, Mar 7, 2018 at 8:04 PM, John Roesler wrote: >> >>> On Wed, Mar 7, 2018 at 8:03 PM, John Roesler wrote: >>> >>>> Thanks Ted, >>>> >>>> Sure thing; I updated the example code in the KIP with a little >> snippet. >>>> >>>> -John >>>> >>>> On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu wrote: >>>> >>>>> Looks good. >>>>> >>>>> See if you can add punctuator into the sample code. >>>>> >>>>> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler >> wrote: >>>>> >>>>>> Dear Kafka community, >>>>>> >>>>>> I am proposing KIP-267 to augment the public Streams test utils API. >>>>>> The goal is to simplify testing of Kafka Streams applications. >>>>>> >>>>>> Please find details in the >>>>>> wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>> 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils >>>>>> >>>>>> An initial WIP PR can be found here:https://github.com/ >>>>>> apache/kafka/pull/4662 >>>>>> >>>>>> I also included the user-list (please hit "reply-all" to include >> both >>>>>> lists in this KIP discussion). >>>>>> >>>>>> Thanks, >>>>>> >>>>>> -John >>>>>> >>>>> >>>> >>>> >>> >> >> >> >> -- >> -- Guozhang >> > signature.asc Description: OpenPGP digital signature
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
Actually, replacing the MockProcessorContext in o.a.k.test could be a bit tricky, since it would make the "streams" module depend on "streams:test-utils", but "streams:test-utils" already depends on "streams". At first glance, it seems like the options are: 1. leave the two separate implementations in place. This shouldn't be underestimated, especially since our internal tests may need different things from a mocked P.C. than our API users. 2. move the public testing artifacts into the regular streams module 3. move the unit tests for Streams into a third module that depends on both streams and test-utils. Yuck! Thanks, -John On Thu, Mar 8, 2018 at 3:16 PM, John Roesler wrote: > Thanks for the review, Guozhang, > > In response: > 1. I missed that! I'll look into it and update the KIP. > > 2. I was planning to use the real implementation, since folks might > register some metrics in the processors and want to verify the values that > get recorded. If the concern is about initializing all the stuff that's in > the Metrics object, I can instantiate it lazily or even make it optional by > taking a nullable constructor parameter. > > 3. Agreed. I think that's the real sharp edge here. I actually think it > would be neat to auto-trigger those scheduled punctuators, but it seems > like that moves this component out of "mock" territory and into "driver" > territory. Since we already have the TopologyTestDriver, I'd prefer to > focus on keeping the mock lean. I agree it should be in the javadoc as well > as the web documentation. > > Thanks, > -John > > On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang wrote: > >> Hello John, >> >> Thanks for the KIP. I made a pass over the wiki page and here are some >> comments: >> >> 1. Meta-comment: there is an internal class MockProcessorContext under the >> o.a.k.test package, which should be replaced as part of this KIP. >> >> 2. In @Override StreamsMetrics metrics(), will you return a fully created >> StreamsMetricsImpl object or are you planning to use the >> MockStreamsMetrics? Note that for the latter case you probably need to >> look >> into https://issues.apache.org/jira/browse/KAFKA-5676 as well. >> >> 3. Not related to the KIP changes themselves: about >> "context.scheduledPunctuators": we need to well document that in the >> MockProcessorContext the scheduled punctuator will never by >> auto-triggered, >> and hence it is only for testing people's code that some punctuators are >> indeed registered, and if people want full auto punctuation testing they >> have to go with TopologyTestDriver. >> >> >> >> Guozhang >> >> >> On Wed, Mar 7, 2018 at 8:04 PM, John Roesler wrote: >> >> > On Wed, Mar 7, 2018 at 8:03 PM, John Roesler wrote: >> > >> > > Thanks Ted, >> > > >> > > Sure thing; I updated the example code in the KIP with a little >> snippet. >> > > >> > > -John >> > > >> > > On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu wrote: >> > > >> > >> Looks good. >> > >> >> > >> See if you can add punctuator into the sample code. >> > >> >> > >> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler >> wrote: >> > >> >> > >> > Dear Kafka community, >> > >> > >> > >> > I am proposing KIP-267 to augment the public Streams test utils >> API. >> > >> > The goal is to simplify testing of Kafka Streams applications. >> > >> > >> > >> > Please find details in the >> > >> > wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > >> > 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils >> > >> > >> > >> > An initial WIP PR can be found here:https://github.com/ >> > >> > apache/kafka/pull/4662 >> > >> > >> > >> > I also included the user-list (please hit "reply-all" to include >> both >> > >> > lists in this KIP discussion). >> > >> > >> > >> > Thanks, >> > >> > >> > >> > -John >> > >> > >> > >> >> > > >> > > >> > >> >> >> >> -- >> -- Guozhang >> > >
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
Thanks for the review, Guozhang, In response: 1. I missed that! I'll look into it and update the KIP. 2. I was planning to use the real implementation, since folks might register some metrics in the processors and want to verify the values that get recorded. If the concern is about initializing all the stuff that's in the Metrics object, I can instantiate it lazily or even make it optional by taking a nullable constructor parameter. 3. Agreed. I think that's the real sharp edge here. I actually think it would be neat to auto-trigger those scheduled punctuators, but it seems like that moves this component out of "mock" territory and into "driver" territory. Since we already have the TopologyTestDriver, I'd prefer to focus on keeping the mock lean. I agree it should be in the javadoc as well as the web documentation. Thanks, -John On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang wrote: > Hello John, > > Thanks for the KIP. I made a pass over the wiki page and here are some > comments: > > 1. Meta-comment: there is an internal class MockProcessorContext under the > o.a.k.test package, which should be replaced as part of this KIP. > > 2. In @Override StreamsMetrics metrics(), will you return a fully created > StreamsMetricsImpl object or are you planning to use the > MockStreamsMetrics? Note that for the latter case you probably need to look > into https://issues.apache.org/jira/browse/KAFKA-5676 as well. > > 3. Not related to the KIP changes themselves: about > "context.scheduledPunctuators": we need to well document that in the > MockProcessorContext the scheduled punctuator will never by auto-triggered, > and hence it is only for testing people's code that some punctuators are > indeed registered, and if people want full auto punctuation testing they > have to go with TopologyTestDriver. > > > > Guozhang > > > On Wed, Mar 7, 2018 at 8:04 PM, John Roesler wrote: > > > On Wed, Mar 7, 2018 at 8:03 PM, John Roesler wrote: > > > > > Thanks Ted, > > > > > > Sure thing; I updated the example code in the KIP with a little > snippet. > > > > > > -John > > > > > > On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu wrote: > > > > > >> Looks good. > > >> > > >> See if you can add punctuator into the sample code. > > >> > > >> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler > wrote: > > >> > > >> > Dear Kafka community, > > >> > > > >> > I am proposing KIP-267 to augment the public Streams test utils API. > > >> > The goal is to simplify testing of Kafka Streams applications. > > >> > > > >> > Please find details in the > > >> > wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >> > 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils > > >> > > > >> > An initial WIP PR can be found here:https://github.com/ > > >> > apache/kafka/pull/4662 > > >> > > > >> > I also included the user-list (please hit "reply-all" to include > both > > >> > lists in this KIP discussion). > > >> > > > >> > Thanks, > > >> > > > >> > -John > > >> > > > >> > > > > > > > > > > > > -- > -- Guozhang >
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
On Wed, Mar 7, 2018 at 8:03 PM, John Roesler wrote: > Thanks Ted, > > Sure thing; I updated the example code in the KIP with a little snippet. > > -John > > On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu wrote: > >> Looks good. >> >> See if you can add punctuator into the sample code. >> >> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler wrote: >> >> > Dear Kafka community, >> > >> > I am proposing KIP-267 to augment the public Streams test utils API. >> > The goal is to simplify testing of Kafka Streams applications. >> > >> > Please find details in the >> > wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils >> > >> > An initial WIP PR can be found here:https://github.com/ >> > apache/kafka/pull/4662 >> > >> > I also included the user-list (please hit "reply-all" to include both >> > lists in this KIP discussion). >> > >> > Thanks, >> > >> > -John >> > >> > >
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
Thanks Ted, Sure thing; I updated the example code in the KIP with a little snippet. -John On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu wrote: > Looks good. > > See if you can add punctuator into the sample code. > > On Wed, Mar 7, 2018 at 7:10 PM, John Roesler wrote: > > > Dear Kafka community, > > > > I am proposing KIP-267 to augment the public Streams test utils API. > > The goal is to simplify testing of Kafka Streams applications. > > > > Please find details in the > > wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils > > > > An initial WIP PR can be found here:https://github.com/ > > apache/kafka/pull/4662 > > > > I also included the user-list (please hit "reply-all" to include both > > lists in this KIP discussion). > > > > Thanks, > > > > -John > > >
Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
Looks good. See if you can add punctuator into the sample code. On Wed, Mar 7, 2018 at 7:10 PM, John Roesler wrote: > Dear Kafka community, > > I am proposing KIP-267 to augment the public Streams test utils API. > The goal is to simplify testing of Kafka Streams applications. > > Please find details in the > wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils > > An initial WIP PR can be found here:https://github.com/ > apache/kafka/pull/4662 > > I also included the user-list (please hit "reply-all" to include both > lists in this KIP discussion). > > Thanks, > > -John >
[DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils
Dear Kafka community, I am proposing KIP-267 to augment the public Streams test utils API. The goal is to simplify testing of Kafka Streams applications. Please find details in the wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP-267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils An initial WIP PR can be found here:https://github.com/apache/kafka/pull/4662 I also included the user-list (please hit "reply-all" to include both lists in this KIP discussion). Thanks, -John
Re: error when attempting a unit test of spring kafka producer
Also using these dependencies - Gradle: org.springframework.kafka:spring-kafka-test:1.1.7.RELEASE - Gradle: org.springframework.kafka:spring-kafka:1.3.2.RELEASE On Wed, Feb 14, 2018 at 2:13 PM, Ian Ewing wrote: > From my build.gradle: > > buildscript { > repositories { > mavenCentral() > } > dependencies { > > classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.10.RELEASE") > } > } > > apply plugin: 'java' > apply plugin: 'eclipse' > apply plugin: 'idea' > apply plugin: 'org.springframework.boot' > > jar { > baseName = 'calculator' > version = '0.1.0' > } > > repositories { > mavenCentral() > } > > sourceCompatibility = 1.8 > targetCompatibility = 1.8 > > dependencies { > compile("org.springframework.boot:spring-boot-starter-web") > testCompile("org.springframework.boot:spring-boot-starter-test") > compile("org.springframework.kafka:spring-kafka:1.3.2.RELEASE") > testCompile("org.springframework.kafka:spring-kafka-test") > } > > And this is from my project structure. I wonder if that is part of the > problem, having .10 and .11? > > >- Gradle: org.apache.kafka:kafka-clients:0.11.0.0 >- Gradle: org.apache.kafka:kafka-clients:test:0.11.0.0 >- Gradle: org.apache.kafka:kafka_2.11:0.10.1.1 >- Gradle: org.apache.kafka:kafka_2.11:test:0.10.1.1 > > On Feb 13, 2018 21:09, "Ted Yu" wrote: > >> LoginType was in 0.10.x release. >> >> This seems to indicate Kafka version mismatch. >> >> Can you check the dependencies of your test ? >> >> Thanks >> >> On Tue, Feb 13, 2018 at 8:03 PM, Ian Ewing wrote: >> >> > I have been trying to figure out how to unit test a kafka producer. >> Should >> > take in a simple integer and perform some addition. Followed what I >> could >> > find on spring kafka unit testing but keep running into this error: >> > >> > 19:53:12.788 [main] ERROR kafka.server.KafkaServer - [Kafka Server 0], >> > Fatal error during KafkaServer startup. Prepare to shutdown >> > java.lang.NoClassDefFoundError: org/apache/kafka/common/networ >> k/LoginType >> > at kafka.network.Processor.(SocketServer.scala:406) >> > at kafka.network.SocketServer.newProcessor(SocketServer.scala:141) >> > at >> > kafka.network.SocketServer$$anonfun$startup$1$$anonfun$ >> > apply$1.apply$mcVI$sp(SocketServer.scala:94) >> > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) >> > at >> > kafka.network.SocketServer$$anonfun$startup$1.apply(SocketSe >> rver.scala:93) >> > at >> > kafka.network.SocketServer$$anonfun$startup$1.apply(SocketSe >> rver.scala:89) >> > at scala.collection.Iterator$class.foreach(Iterator.scala:893) >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) >> > at scala.collection.MapLike$DefaultValuesIterable.foreach( >> > MapLike.scala:206) >> > at kafka.network.SocketServer.startup(SocketServer.scala:89) >> > at kafka.server.KafkaServer.startup(KafkaServer.scala:219) >> > at kafka.utils.TestUtils$.createServer(TestUtils.scala:120) >> > at kafka.utils.TestUtils.createServer(TestUtils.scala) >> > at >> > org.springframework.kafka.test.rule.KafkaEmbedded. >> > before(KafkaEmbedded.java:154) >> > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource >> .java:46) >> > at org.junit.rules.RunRules.evaluate(RunRules.java:20) >> > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >> > at >> > org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run( >> > SpringJUnit4ClassRunner.java:191) >> > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >> > at >> > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs( >> > JUnit4IdeaTestRunner.java:68) >> > at >> > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater. >> > startRunnerWithArgs(IdeaTestRunner.java:51) >> > at >> > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart( >> > JUnitStarter.java:242) >> > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStart >> er.java:70) >> > Caused by: java.lang.ClassNotFoundException: >> > org.apache.kafka.common.network.LoginType >> > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) >> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> > ... 23 common frames omitted >> > >> > >> > Has anyone come across this situation? Any ideas on the direction a >> > solution would take? I can provide more information, code, etc. Whatever >> > extra is needed. Don't want to bog down the email with too much. >> > >> > Thanks >> > Ian >> > >> >
Re: error when attempting a unit test of spring kafka producer
>From my build.gradle: buildscript { repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.10.RELEASE") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' jar { baseName = 'calculator' version = '0.1.0' } repositories { mavenCentral() } sourceCompatibility = 1.8 targetCompatibility = 1.8 dependencies { compile("org.springframework.boot:spring-boot-starter-web") testCompile("org.springframework.boot:spring-boot-starter-test") compile("org.springframework.kafka:spring-kafka:1.3.2.RELEASE") testCompile("org.springframework.kafka:spring-kafka-test") } And this is from my project structure. I wonder if that is part of the problem, having .10 and .11? - Gradle: org.apache.kafka:kafka-clients:0.11.0.0 - Gradle: org.apache.kafka:kafka-clients:test:0.11.0.0 - Gradle: org.apache.kafka:kafka_2.11:0.10.1.1 - Gradle: org.apache.kafka:kafka_2.11:test:0.10.1.1 On Feb 13, 2018 21:09, "Ted Yu" wrote: > LoginType was in 0.10.x release. > > This seems to indicate Kafka version mismatch. > > Can you check the dependencies of your test ? > > Thanks > > On Tue, Feb 13, 2018 at 8:03 PM, Ian Ewing wrote: > > > I have been trying to figure out how to unit test a kafka producer. > Should > > take in a simple integer and perform some addition. Followed what I could > > find on spring kafka unit testing but keep running into this error: > > > > 19:53:12.788 [main] ERROR kafka.server.KafkaServer - [Kafka Server 0], > > Fatal error during KafkaServer startup. Prepare to shutdown > > java.lang.NoClassDefFoundError: org/apache/kafka/common/networ > k/LoginType > > at kafka.network.Processor.(SocketServer.scala:406) > > at kafka.network.SocketServer.newProcessor(SocketServer.scala:141) > > at > > kafka.network.SocketServer$$anonfun$startup$1$$anonfun$ > > apply$1.apply$mcVI$sp(SocketServer.scala:94) > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > > at > > kafka.network.SocketServer$$anonfun$startup$1.apply(SocketSe > rver.scala:93) > > at > > kafka.network.SocketServer$$anonfun$startup$1.apply(SocketSe > rver.scala:89) > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > > at scala.collection.MapLike$DefaultValuesIterable.foreach( > > MapLike.scala:206) > > at kafka.network.SocketServer.startup(SocketServer.scala:89) > > at kafka.server.KafkaServer.startup(KafkaServer.scala:219) > > at kafka.utils.TestUtils$.createServer(TestUtils.scala:120) > > at kafka.utils.TestUtils.createServer(TestUtils.scala) > > at > > org.springframework.kafka.test.rule.KafkaEmbedded. > > before(KafkaEmbedded.java:154) > > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) > > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > > at > > org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run( > > SpringJUnit4ClassRunner.java:191) > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > > at > > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs( > > JUnit4IdeaTestRunner.java:68) > > at > > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater. > > startRunnerWithArgs(IdeaTestRunner.java:51) > > at > > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart( > > JUnitStarter.java:242) > > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStart > er.java:70) > > Caused by: java.lang.ClassNotFoundException: > > org.apache.kafka.common.network.LoginType > > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > ... 23 common frames omitted > > > > > > Has anyone come across this situation? Any ideas on the direction a > > solution would take? I can provide more information, code, etc. Whatever > > extra is needed. Don't want to bog down the email with too much. > > > > Thanks > > Ian > > >
Re: error when attempting a unit test of spring kafka producer
LoginType was in 0.10.x release. This seems to indicate Kafka version mismatch. Can you check the dependencies of your test ? Thanks On Tue, Feb 13, 2018 at 8:03 PM, Ian Ewing wrote: > I have been trying to figure out how to unit test a kafka producer. Should > take in a simple integer and perform some addition. Followed what I could > find on spring kafka unit testing but keep running into this error: > > 19:53:12.788 [main] ERROR kafka.server.KafkaServer - [Kafka Server 0], > Fatal error during KafkaServer startup. Prepare to shutdown > java.lang.NoClassDefFoundError: org/apache/kafka/common/network/LoginType > at kafka.network.Processor.(SocketServer.scala:406) > at kafka.network.SocketServer.newProcessor(SocketServer.scala:141) > at > kafka.network.SocketServer$$anonfun$startup$1$$anonfun$ > apply$1.apply$mcVI$sp(SocketServer.scala:94) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:93) > at > kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.MapLike$DefaultValuesIterable.foreach( > MapLike.scala:206) > at kafka.network.SocketServer.startup(SocketServer.scala:89) > at kafka.server.KafkaServer.startup(KafkaServer.scala:219) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:120) > at kafka.utils.TestUtils.createServer(TestUtils.scala) > at > org.springframework.kafka.test.rule.KafkaEmbedded. > before(KafkaEmbedded.java:154) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run( > SpringJUnit4ClassRunner.java:191) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs( > JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater. > startRunnerWithArgs(IdeaTestRunner.java:51) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart( > JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.common.network.LoginType > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 23 common frames omitted > > > Has anyone come across this situation? Any ideas on the direction a > solution would take? I can provide more information, code, etc. Whatever > extra is needed. Don't want to bog down the email with too much. > > Thanks > Ian >
error when attempting a unit test of spring kafka producer
I have been trying to figure out how to unit test a kafka producer. Should take in a simple integer and perform some addition. Followed what I could find on spring kafka unit testing but keep running into this error: 19:53:12.788 [main] ERROR kafka.server.KafkaServer - [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown java.lang.NoClassDefFoundError: org/apache/kafka/common/network/LoginType at kafka.network.Processor.(SocketServer.scala:406) at kafka.network.SocketServer.newProcessor(SocketServer.scala:141) at kafka.network.SocketServer$$anonfun$startup$1$$anonfun$apply$1.apply$mcVI$sp(SocketServer.scala:94) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:93) at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at kafka.network.SocketServer.startup(SocketServer.scala:89) at kafka.server.KafkaServer.startup(KafkaServer.scala:219) at kafka.utils.TestUtils$.createServer(TestUtils.scala:120) at kafka.utils.TestUtils.createServer(TestUtils.scala) at org.springframework.kafka.test.rule.KafkaEmbedded.before(KafkaEmbedded.java:154) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.network.LoginType at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 23 common frames omitted Has anyone come across this situation? Any ideas on the direction a solution would take? I can provide more information, code, etc. Whatever extra is needed. Don't want to bog down the email with too much. Thanks Ian
Re: Group consumer cannot consume messages if kafka service on specific node in test cluster is down
Appreciate you efforts On 2 Feb 2018 2:37 p.m., "Zoran" wrote: > I just want to say that I have solved the situation by deleting > zookeeper's and kafka's data directories and setting > offsets.topic.replication.factor=3 in kafka server.properties file. > > After that, __consumer_offsets topic is replicated and everything works as > expected. > > I hope this will help to someone. > > > Regards. > > > On 01/30/2018 03:02 PM, Zoran wrote: > >> Sorry, I have attached wrong server.properties file. Now the right one is >> in the attachment. >> >> Regards. >> >> >> On 01/30/2018 02:59 PM, Zoran wrote: >> >>> Hi, >>> >>> I have three servers: >>> >>> blade1 (192.168.112.31), >>> blade2 (192.168.112.32) and >>> blade3 (192.168.112.33). >>> >>> On each of servers kafka_2.11-1.0.0 is installed. >>> On blade3 (192.168.112.33:2181) zookeeper is installed as well. >>> >>> I have created a topic repl3part5 with the following line: >>> >>> bin/kafka-topics.sh --zookeeper 192.168.112.33:2181 --create >>> --replication-factor 3 --partitions 5 --topic repl3part5 >>> >>> When I describe the topic, it looks like this: >>> >>> [root@blade1 kafka]# bin/kafka-topics.sh --describe --topic repl3part5 >>> --zookeeper 192.168.112.33:2181 >>> >>> Topic:repl3part5PartitionCount:5ReplicationFactor:3 Configs: >>> Topic: repl3part5Partition: 0Leader: 2Replicas: 2,3,1 >>> Isr: 2,3,1 >>> Topic: repl3part5Partition: 1Leader: 3Replicas: 3,1,2 >>> Isr: 3,1,2 >>> Topic: repl3part5Partition: 2Leader: 1Replicas: 1,2,3 >>> Isr: 1,2,3 >>> Topic: repl3part5Partition: 3Leader: 2Replicas: 2,1,3 >>> Isr: 2,1,3 >>> Topic: repl3part5Partition: 4Leader: 3Replicas: 3,2,1 >>> Isr: 3,2,1 >>> >>> I have a producer for this topic: >>> >>> bin/kafka-console-producer.sh --broker-list 192.168.112.31:9092, >>> 192.168.112.32:9092,192.168.112.33:9092 --topic repl3part5 >>> >>> and single consumer: >>> >>> bin/kafka-console-consumer.sh --bootstrap-server 192.168.112.31:9092, >>> 192.168.112.32:9092,192.168.112.33:9092 --topic repl3part5 >>> --consumer-property group.id=zoran_1 >>> >>> Every message that is sent by producer gets collected by consumer. So >>> far - so good. >>> >>> Now I would like to test fail over of the kafka servers. If I put down >>> blade 3 kafka service, I get consumer warnings but all produced messages >>> are still consumed. >>> >>> [2018-01-30 14:30:01,203] WARN [Consumer clientId=consumer-1, >>> groupId=zoran_1] Connection to node 3 could not be established. Broker may >>> not be available. (org.apache.kafka.clients.NetworkClient) >>> [2018-01-30 14:30:01,299] WARN [Consumer clientId=consumer-1, >>> groupId=zoran_1] Connection to node 3 could not be established. Broker may >>> not be available. (org.apache.kafka.clients.NetworkClient) >>> [2018-01-30 14:30:01,475] WARN [Consumer clientId=consumer-1, >>> groupId=zoran_1] Connection to node 3 could not be established. Broker may >>> not be available. (org.apache.kafka.clients.NetworkClient) >>> >>> Now I have started up kafka service on blade 3 and I have put down kafka >>> service on blade 2 server. >>> Consumer now showed one warning but all produced messages are still >>> consumed. >>> >>> [2018-01-30 14:31:38,164] WARN [Consumer clientId=consumer-1, >>> groupId=zoran_1] Connection to node 2 could not be established. Broker may >>> not be available. (org.apache.kafka.clients.NetworkClient) >>> >>> Now I have started up kafka service on blade 2 and I have put down kafka >>> service on blade 1 server. >>> >>> Consumer now shows warnings about node 1/2147483646, but also >>> Asynchronous auto-commit of offsets ... failed: Offset commit failed with a >>> retriable exception. You should retry committing offsets. The underlying >>> error was: null. >>> >>> [2018-01-30 14:33:16,393] WARN [Consumer clientId=consumer-1, >>> groupId=zoran_1] Connection to node 1 could not be established. Broker may >>> not be available. (org.apache.kafka.clients.NetworkClient) >>> [2018-01-30 14:33:16,469] WARN [Consumer clientId=consumer-1, >>> groupId=zoran_1] Connection to node
Re: Group consumer cannot consume messages if kafka service on specific node in test cluster is down
I just want to say that I have solved the situation by deleting zookeeper's and kafka's data directories and setting offsets.topic.replication.factor=3 in kafka server.properties file. After that, __consumer_offsets topic is replicated and everything works as expected. I hope this will help to someone. Regards. On 01/30/2018 03:02 PM, Zoran wrote: Sorry, I have attached wrong server.properties file. Now the right one is in the attachment. Regards. On 01/30/2018 02:59 PM, Zoran wrote: Hi, I have three servers: blade1 (192.168.112.31), blade2 (192.168.112.32) and blade3 (192.168.112.33). On each of servers kafka_2.11-1.0.0 is installed. On blade3 (192.168.112.33:2181) zookeeper is installed as well. I have created a topic repl3part5 with the following line: bin/kafka-topics.sh --zookeeper 192.168.112.33:2181 --create --replication-factor 3 --partitions 5 --topic repl3part5 When I describe the topic, it looks like this: [root@blade1 kafka]# bin/kafka-topics.sh --describe --topic repl3part5 --zookeeper 192.168.112.33:2181 Topic:repl3part5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: repl3part5 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: repl3part5 Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: repl3part5 Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: repl3part5 Partition: 3 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: repl3part5 Partition: 4 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 I have a producer for this topic: bin/kafka-console-producer.sh --broker-list 192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic repl3part5 and single consumer: bin/kafka-console-consumer.sh --bootstrap-server 192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic repl3part5 --consumer-property group.id=zoran_1 Every message that is sent by producer gets collected by consumer. So far - so good. Now I would like to test fail over of the kafka servers. If I put down blade 3 kafka service, I get consumer warnings but all produced messages are still consumed. [2018-01-30 14:30:01,203] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:30:01,299] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:30:01,475] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Now I have started up kafka service on blade 3 and I have put down kafka service on blade 2 server. Consumer now showed one warning but all produced messages are still consumed. [2018-01-30 14:31:38,164] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Now I have started up kafka service on blade 2 and I have put down kafka service on blade 1 server. Consumer now shows warnings about node 1/2147483646, but also Asynchronous auto-commit of offsets ... failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: null. [2018-01-30 14:33:16,393] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,469] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,557] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,986] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,991] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:17,493] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:17,495] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:18,002] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connecti
Group consumer cannot consume messages if kafka service on specific node in test cluster is down
This is not my question, but I saw it on Stack Overflow yesterday and have been wondering about it: https://stackoverflow.com/questions/48523972/group- consumer-cannot-consume-messages-if-kafka-service-on-specific-node-in-test. Anyone else seen behavior like this?
Re: Group consumer cannot consume messages if kafka service on specific node in test cluster is down
Sorry, I have attached wrong server.properties file. Now the right one is in the attachment. Regards. On 01/30/2018 02:59 PM, Zoran wrote: Hi, I have three servers: blade1 (192.168.112.31), blade2 (192.168.112.32) and blade3 (192.168.112.33). On each of servers kafka_2.11-1.0.0 is installed. On blade3 (192.168.112.33:2181) zookeeper is installed as well. I have created a topic repl3part5 with the following line: bin/kafka-topics.sh --zookeeper 192.168.112.33:2181 --create --replication-factor 3 --partitions 5 --topic repl3part5 When I describe the topic, it looks like this: [root@blade1 kafka]# bin/kafka-topics.sh --describe --topic repl3part5 --zookeeper 192.168.112.33:2181 Topic:repl3part5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: repl3part5 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: repl3part5 Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: repl3part5 Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: repl3part5 Partition: 3 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: repl3part5 Partition: 4 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 I have a producer for this topic: bin/kafka-console-producer.sh --broker-list 192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic repl3part5 and single consumer: bin/kafka-console-consumer.sh --bootstrap-server 192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic repl3part5 --consumer-property group.id=zoran_1 Every message that is sent by producer gets collected by consumer. So far - so good. Now I would like to test fail over of the kafka servers. If I put down blade 3 kafka service, I get consumer warnings but all produced messages are still consumed. [2018-01-30 14:30:01,203] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:30:01,299] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:30:01,475] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Now I have started up kafka service on blade 3 and I have put down kafka service on blade 2 server. Consumer now showed one warning but all produced messages are still consumed. [2018-01-30 14:31:38,164] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Now I have started up kafka service on blade 2 and I have put down kafka service on blade 1 server. Consumer now shows warnings about node 1/2147483646, but also Asynchronous auto-commit of offsets ... failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: null. [2018-01-30 14:33:16,393] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,469] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,557] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,986] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,991] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:17,493] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:17,495] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:18,002] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:18,003] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Asynchronous auto-commit of offsets {repl3part5-4=OffsetAndMetadata{offset=18, metadata=''}, repl3part5-3=OffsetAndMetadata{offset=20, metadata='
Group consumer cannot consume messages if kafka service on specific node in test cluster is down
Hi, I have three servers: blade1 (192.168.112.31), blade2 (192.168.112.32) and blade3 (192.168.112.33). On each of servers kafka_2.11-1.0.0 is installed. On blade3 (192.168.112.33:2181) zookeeper is installed as well. I have created a topic repl3part5 with the following line: bin/kafka-topics.sh --zookeeper 192.168.112.33:2181 --create --replication-factor 3 --partitions 5 --topic repl3part5 When I describe the topic, it looks like this: [root@blade1 kafka]# bin/kafka-topics.sh --describe --topic repl3part5 --zookeeper 192.168.112.33:2181 Topic:repl3part5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: repl3part5 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: repl3part5 Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: repl3part5 Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: repl3part5 Partition: 3 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: repl3part5 Partition: 4 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 I have a producer for this topic: bin/kafka-console-producer.sh --broker-list 192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic repl3part5 and single consumer: bin/kafka-console-consumer.sh --bootstrap-server 192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic repl3part5 --consumer-property group.id=zoran_1 Every message that is sent by producer gets collected by consumer. So far - so good. Now I would like to test fail over of the kafka servers. If I put down blade 3 kafka service, I get consumer warnings but all produced messages are still consumed. [2018-01-30 14:30:01,203] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:30:01,299] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:30:01,475] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Now I have started up kafka service on blade 3 and I have put down kafka service on blade 2 server. Consumer now showed one warning but all produced messages are still consumed. [2018-01-30 14:31:38,164] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Now I have started up kafka service on blade 2 and I have put down kafka service on blade 1 server. Consumer now shows warnings about node 1/2147483646, but also Asynchronous auto-commit of offsets ... failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: null. [2018-01-30 14:33:16,393] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,469] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,557] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,986] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,991] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:17,493] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:17,495] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:18,002] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:18,003] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Asynchronous auto-commit of offsets {repl3part5-4=OffsetAndMetadata{offset=18, metadata=''}, repl3part5-3=OffsetAndMetadata{offset=20, metadata=''}, repl3part5-2=OffsetAndMetadata{offset=19, metadata=''}, repl3part5-1=OffsetAndMetadata{offset=20, metadata=''}, repl3part5
Re: [VOTE] KIP-247: Add public test utils for Kafka Streams
One minor change to the KIP. The class TopologyTestDriver will be in package `org.apache.kafka.streams` (instead of `o.a.k.streams.test`). +1 (binding). I am closing this vote as accepted with 3 binding votes (Damian, Guozhang, Matthias) and 2 non-binding votes (Bill, James). Thanks for the discussion and voting! -Matthias On 1/18/18 4:17 PM, Matthias J. Sax wrote: > I added the new method to the KIP and also updated the PR. > > -Matthias > > On 1/18/18 10:48 AM, Guozhang Wang wrote: >> @Matthias >> >> This comes to me while reviewing another using the test driver: could we >> add a `Map allStateStores()` to the >> `TopologyTestDriver` besides all the get-store-by-name functions? This is >> because some of the internal state stores may be implicitly created but >> users may still want to check its state. >> >> >> Guozhang >> >> >> On Thu, Jan 18, 2018 at 8:40 AM, James Cheng wrote: >> >>> +1 (non-binding) >>> >>> -James >>> >>> Sent from my iPhone >>> >>>> On Jan 17, 2018, at 6:09 PM, Matthias J. Sax >>> wrote: >>>> >>>> Hi, >>>> >>>> I would like to start the vote for KIP-247: >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> 247%3A+Add+public+test+utils+for+Kafka+Streams >>>> >>>> >>>> -Matthias >>>> >>> >> >> >> > signature.asc Description: OpenPGP digital signature
Re: [VOTE] KIP-247: Add public test utils for Kafka Streams
I added the new method to the KIP and also updated the PR. -Matthias On 1/18/18 10:48 AM, Guozhang Wang wrote: > @Matthias > > This comes to me while reviewing another using the test driver: could we > add a `Map allStateStores()` to the > `TopologyTestDriver` besides all the get-store-by-name functions? This is > because some of the internal state stores may be implicitly created but > users may still want to check its state. > > > Guozhang > > > On Thu, Jan 18, 2018 at 8:40 AM, James Cheng wrote: > >> +1 (non-binding) >> >> -James >> >> Sent from my iPhone >> >>> On Jan 17, 2018, at 6:09 PM, Matthias J. Sax >> wrote: >>> >>> Hi, >>> >>> I would like to start the vote for KIP-247: >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 247%3A+Add+public+test+utils+for+Kafka+Streams >>> >>> >>> -Matthias >>> >> > > > signature.asc Description: OpenPGP digital signature
Re: [VOTE] KIP-247: Add public test utils for Kafka Streams
+1 (non-binding) -James Sent from my iPhone > On Jan 17, 2018, at 6:09 PM, Matthias J. Sax wrote: > > Hi, > > I would like to start the vote for KIP-247: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams > > > -Matthias >
Re: [VOTE] KIP-247: Add public test utils for Kafka Streams
+1 On Thu, 18 Jan 2018 at 15:14 Bill Bejeck wrote: > Thanks for the KIP. > > +1 > > -Bill > > On Wed, Jan 17, 2018 at 9:09 PM, Matthias J. Sax > wrote: > > > Hi, > > > > I would like to start the vote for KIP-247: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 247%3A+Add+public+test+utils+for+Kafka+Streams > > > > > > -Matthias > > > > >
Re: [VOTE] KIP-247: Add public test utils for Kafka Streams
Thanks for the KIP. +1 -Bill On Wed, Jan 17, 2018 at 9:09 PM, Matthias J. Sax wrote: > Hi, > > I would like to start the vote for KIP-247: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 247%3A+Add+public+test+utils+for+Kafka+Streams > > > -Matthias > >
Re: [VOTE] KIP-247: Add public test utils for Kafka Streams
+1 Original message From: Guozhang Wang Date: 1/17/18 10:38 PM (GMT-08:00) To: users@kafka.apache.org Subject: Re: [VOTE] KIP-247: Add public test utils for Kafka Streams +1 (binding). On Wed, Jan 17, 2018 at 6:09 PM, Matthias J. Sax wrote: > Hi, > > I would like to start the vote for KIP-247: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 247%3A+Add+public+test+utils+for+Kafka+Streams > > > -Matthias > > -- -- Guozhang
Re: [VOTE] KIP-247: Add public test utils for Kafka Streams
+1 (binding). On Wed, Jan 17, 2018 at 6:09 PM, Matthias J. Sax wrote: > Hi, > > I would like to start the vote for KIP-247: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 247%3A+Add+public+test+utils+for+Kafka+Streams > > > -Matthias > > -- -- Guozhang
[VOTE] KIP-247: Add public test utils for Kafka Streams
Hi, I would like to start the vote for KIP-247: https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams -Matthias signature.asc Description: OpenPGP digital signature
Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Hi, I updated the KIP and added an `OutputVerifier` class. Looking forward to your feedback. I will start the Vote thread in parallel. -Matthias On 1/17/18 11:40 AM, Matthias J. Sax wrote: > Saïd, > > sorry for late reply. Avro records are basically supported. However, > there are no plans atm to integrate the test driver with a schema > registry. Note, that Apache Kafka does not provide a schema registry by > itself. If we integrate one RS, we need to integrate all of them -- that > is something Apache Kafka should not do from my point of view. > > Thus, if you want to use ArvoSerdes that use a schema registry, those > should mock be mocked accordingly. However, there is nothing this KIP > can help with this regard IMHO. > > I left a comment on the ticket to pointed out. :) > > > -Matthias > > On 1/17/18 1:29 AM, Saïd Bouras wrote: >> Matthias, >> >> What about testing topology that use avro schema ? Have you read my >> previous response ? >> >> Thanks. >> >> On Wed, Jan 17, 2018 at 3:34 AM Matthias J. Sax >> wrote: >> >>> Colin, >>> >>> the TopologyTestDriver does not connect to any broker and simulates >>> processing of single-partitioned input topics purely in-memory (the >>> driver is basically a mock for a StreamThread). This is sufficient to >>> test basic business logic. For more complex topologies that are actually >>> divided into sub-topologies and connected via topics, the driver detects >>> this case and does an in-memory forward. >>> >>> >>> -Matthias >>> >>> On 1/16/18 10:08 AM, Colin McCabe wrote: >>>> Thanks, Matthias, this looks great. >>>> >>>> It seems like these APIs could either be used against mock objects, or >>> against real brokers running in the same process. Is there a way for the >>> user to select which they want when using the API? Sorry if it's in the >>> KIP and I missed it. >>>> >>>> cheers, >>>> Colin >>>> >>>> >>>> On Thu, Jan 11, 2018, at 18:06, Matthias J. Sax wrote: >>>>> Dear Kafka community, >>>>> >>>>> I want to propose KIP-247 to add public test utils to the Streams API. >>>>> The goal is to simplify testing of Kafka Streams applications. >>>>> >>>>> Please find details in the wiki: >>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams >>>>> >>>>> This is an initial KIP, and we hope to add more utility functions later. >>>>> Thus, this KIP is not comprehensive but a first step. Of course, we can >>>>> enrich this initial KIP if we think it falls too short. But we should >>>>> not aim to be comprehensive to keep the scope manageable. >>>>> >>>>> In fact, I think we should add some more helpers to simplify result >>>>> verification. I will update the KIP with this asap. Just wanted to start >>>>> the discussion early on. >>>>> >>>>> An initial WIP PR can be found here: >>>>> https://github.com/apache/kafka/pull/4402 >>>>> >>>>> I also included the user-list (please hit "reply-all" to include both >>>>> lists in this KIP discussion). >>>>> >>>>> Thanks a lot. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> Email had 1 attachment: >>>>> + signature.asc >>>>> 1k (application/pgp-signature) >>> >>> >> > signature.asc Description: OpenPGP digital signature
Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Saïd, sorry for late reply. Avro records are basically supported. However, there are no plans atm to integrate the test driver with a schema registry. Note, that Apache Kafka does not provide a schema registry by itself. If we integrate one RS, we need to integrate all of them -- that is something Apache Kafka should not do from my point of view. Thus, if you want to use ArvoSerdes that use a schema registry, those should mock be mocked accordingly. However, there is nothing this KIP can help with this regard IMHO. I left a comment on the ticket to pointed out. :) -Matthias On 1/17/18 1:29 AM, Saïd Bouras wrote: > Matthias, > > What about testing topology that use avro schema ? Have you read my > previous response ? > > Thanks. > > On Wed, Jan 17, 2018 at 3:34 AM Matthias J. Sax > wrote: > >> Colin, >> >> the TopologyTestDriver does not connect to any broker and simulates >> processing of single-partitioned input topics purely in-memory (the >> driver is basically a mock for a StreamThread). This is sufficient to >> test basic business logic. For more complex topologies that are actually >> divided into sub-topologies and connected via topics, the driver detects >> this case and does an in-memory forward. >> >> >> -Matthias >> >> On 1/16/18 10:08 AM, Colin McCabe wrote: >>> Thanks, Matthias, this looks great. >>> >>> It seems like these APIs could either be used against mock objects, or >> against real brokers running in the same process. Is there a way for the >> user to select which they want when using the API? Sorry if it's in the >> KIP and I missed it. >>> >>> cheers, >>> Colin >>> >>> >>> On Thu, Jan 11, 2018, at 18:06, Matthias J. Sax wrote: >>>> Dear Kafka community, >>>> >>>> I want to propose KIP-247 to add public test utils to the Streams API. >>>> The goal is to simplify testing of Kafka Streams applications. >>>> >>>> Please find details in the wiki: >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams >>>> >>>> This is an initial KIP, and we hope to add more utility functions later. >>>> Thus, this KIP is not comprehensive but a first step. Of course, we can >>>> enrich this initial KIP if we think it falls too short. But we should >>>> not aim to be comprehensive to keep the scope manageable. >>>> >>>> In fact, I think we should add some more helpers to simplify result >>>> verification. I will update the KIP with this asap. Just wanted to start >>>> the discussion early on. >>>> >>>> An initial WIP PR can be found here: >>>> https://github.com/apache/kafka/pull/4402 >>>> >>>> I also included the user-list (please hit "reply-all" to include both >>>> lists in this KIP discussion). >>>> >>>> Thanks a lot. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> Email had 1 attachment: >>>> + signature.asc >>>> 1k (application/pgp-signature) >> >> > signature.asc Description: OpenPGP digital signature
Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Matthias, What about testing topology that use avro schema ? Have you read my previous response ? Thanks. On Wed, Jan 17, 2018 at 3:34 AM Matthias J. Sax wrote: > Colin, > > the TopologyTestDriver does not connect to any broker and simulates > processing of single-partitioned input topics purely in-memory (the > driver is basically a mock for a StreamThread). This is sufficient to > test basic business logic. For more complex topologies that are actually > divided into sub-topologies and connected via topics, the driver detects > this case and does an in-memory forward. > > > -Matthias > > On 1/16/18 10:08 AM, Colin McCabe wrote: > > Thanks, Matthias, this looks great. > > > > It seems like these APIs could either be used against mock objects, or > against real brokers running in the same process. Is there a way for the > user to select which they want when using the API? Sorry if it's in the > KIP and I missed it. > > > > cheers, > > Colin > > > > > > On Thu, Jan 11, 2018, at 18:06, Matthias J. Sax wrote: > >> Dear Kafka community, > >> > >> I want to propose KIP-247 to add public test utils to the Streams API. > >> The goal is to simplify testing of Kafka Streams applications. > >> > >> Please find details in the wiki: > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams > >> > >> This is an initial KIP, and we hope to add more utility functions later. > >> Thus, this KIP is not comprehensive but a first step. Of course, we can > >> enrich this initial KIP if we think it falls too short. But we should > >> not aim to be comprehensive to keep the scope manageable. > >> > >> In fact, I think we should add some more helpers to simplify result > >> verification. I will update the KIP with this asap. Just wanted to start > >> the discussion early on. > >> > >> An initial WIP PR can be found here: > >> https://github.com/apache/kafka/pull/4402 > >> > >> I also included the user-list (please hit "reply-all" to include both > >> lists in this KIP discussion). > >> > >> Thanks a lot. > >> > >> > >> -Matthias > >> > >> > >> Email had 1 attachment: > >> + signature.asc > >> 1k (application/pgp-signature) > > -- *Saïd Bouras*
[HELP] Guidelines/tools to test kafka performance with application layer involved
Hello, We are using kafka for pub sub and want test performance of entire system. Is there any tool readily available in kafka world which can simulate multiple publishers and subscribers tp measure latency and throughput considering custom application layer? Any guidelines around this would be helpful? Thanks, Pritam Kadam
Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Colin, the TopologyTestDriver does not connect to any broker and simulates processing of single-partitioned input topics purely in-memory (the driver is basically a mock for a StreamThread). This is sufficient to test basic business logic. For more complex topologies that are actually divided into sub-topologies and connected via topics, the driver detects this case and does an in-memory forward. -Matthias On 1/16/18 10:08 AM, Colin McCabe wrote: > Thanks, Matthias, this looks great. > > It seems like these APIs could either be used against mock objects, or > against real brokers running in the same process. Is there a way for the > user to select which they want when using the API? Sorry if it's in the KIP > and I missed it. > > cheers, > Colin > > > On Thu, Jan 11, 2018, at 18:06, Matthias J. Sax wrote: >> Dear Kafka community, >> >> I want to propose KIP-247 to add public test utils to the Streams API. >> The goal is to simplify testing of Kafka Streams applications. >> >> Please find details in the wiki: >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams >> >> This is an initial KIP, and we hope to add more utility functions later. >> Thus, this KIP is not comprehensive but a first step. Of course, we can >> enrich this initial KIP if we think it falls too short. But we should >> not aim to be comprehensive to keep the scope manageable. >> >> In fact, I think we should add some more helpers to simplify result >> verification. I will update the KIP with this asap. Just wanted to start >> the discussion early on. >> >> An initial WIP PR can be found here: >> https://github.com/apache/kafka/pull/4402 >> >> I also included the user-list (please hit "reply-all" to include both >> lists in this KIP discussion). >> >> Thanks a lot. >> >> >> -Matthias >> >> >> Email had 1 attachment: >> + signature.asc >> 1k (application/pgp-signature) signature.asc Description: OpenPGP digital signature
Fwd: Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Forgot dev-list... Forwarded Message Subject: Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams Date: Tue, 16 Jan 2018 13:56:38 -0800 From: Matthias J. Sax Organization: Confluent Inc To: users@kafka.apache.org Thanks a lot for the comments. @Guozhang: I updated the KIP accordingly. With regard to potential client test-utils, I agree, but not sure how to resolve it. I guess, we just need to move the class for this case later on. (One reason to annotate all classes with @Evolving) @Bill: The new artifact will be included without the "classifier:test" tag, because it's a regular dependency (the published artifact is not a test artifact). For existing code, we don't remove any existing internal test class in 1.1.0 so the code should still work -- but as internal test classes are internal, we don't provide any guarantee about compatibility in the first place. About `ConsumerRecordFactory`: I think all overloads are useful -- if you remove the overload taking a topicName as input, you cannot overwrite the defaultTopicName and thus need to create a factory for each input topic. On the other hand, if you remove the overloads without talking a topicName, you force people to define a defaultTopicName, and thus they need to create a factory for each topic, too. The goal is to allow the usage of a single factory even if there are multiple topics. The defaultName is useful, if you want to create a lot to record for a single topic, but not a good fit if you create just a few record for each topic. (At least, that is my thinking.). The `null` approach might work, too, but I think this result in ugly boiler plate code and thus I personally prefer the add the overloads. Let me know if you have a strong opinion for the `null` approach with reduced number of overloads. Hope to add the helpers for result verification this week... @Jeff: the available method include global stores -- I added a comment to the KIP -Matthias On 1/16/18 11:28 AM, Jeff Klukas wrote: > From what I can tell, global state stores are managed separately from other > state stores and are accessed via different methods. > > Do the proposed methods on TopologyTestDriver (such as getStateStore) cover > global stores? If not, can we add an interface for accessing and testing > global stores in the scope of this KIP? > > On Thu, Jan 11, 2018 at 9:06 PM, Matthias J. Sax > wrote: > >> Dear Kafka community, >> >> I want to propose KIP-247 to add public test utils to the Streams API. >> The goal is to simplify testing of Kafka Streams applications. >> >> Please find details in the wiki: >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 247%3A+Add+public+test+utils+for+Kafka+Streams >> >> This is an initial KIP, and we hope to add more utility functions later. >> Thus, this KIP is not comprehensive but a first step. Of course, we can >> enrich this initial KIP if we think it falls too short. But we should >> not aim to be comprehensive to keep the scope manageable. >> >> In fact, I think we should add some more helpers to simplify result >> verification. I will update the KIP with this asap. Just wanted to start >> the discussion early on. >> >> An initial WIP PR can be found here: >> https://github.com/apache/kafka/pull/4402 >> >> I also included the user-list (please hit "reply-all" to include both >> lists in this KIP discussion). >> >> Thanks a lot. >> >> >> -Matthias >> >> >> > signature.asc Description: OpenPGP digital signature
Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Thanks a lot for the comments. @Guozhang: I updated the KIP accordingly. With regard to potential client test-utils, I agree, but not sure how to resolve it. I guess, we just need to move the class for this case later on. (One reason to annotate all classes with @Evolving) @Bill: The new artifact will be included without the "classifier:test" tag, because it's a regular dependency (the published artifact is not a test artifact). For existing code, we don't remove any existing internal test class in 1.1.0 so the code should still work -- but as internal test classes are internal, we don't provide any guarantee about compatibility in the first place. About `ConsumerRecordFactory`: I think all overloads are useful -- if you remove the overload taking a topicName as input, you cannot overwrite the defaultTopicName and thus need to create a factory for each input topic. On the other hand, if you remove the overloads without talking a topicName, you force people to define a defaultTopicName, and thus they need to create a factory for each topic, too. The goal is to allow the usage of a single factory even if there are multiple topics. The defaultName is useful, if you want to create a lot to record for a single topic, but not a good fit if you create just a few record for each topic. (At least, that is my thinking.). The `null` approach might work, too, but I think this result in ugly boiler plate code and thus I personally prefer the add the overloads. Let me know if you have a strong opinion for the `null` approach with reduced number of overloads. Hope to add the helpers for result verification this week... @Jeff: the available method include global stores -- I added a comment to the KIP -Matthias On 1/16/18 11:28 AM, Jeff Klukas wrote: > From what I can tell, global state stores are managed separately from other > state stores and are accessed via different methods. > > Do the proposed methods on TopologyTestDriver (such as getStateStore) cover > global stores? If not, can we add an interface for accessing and testing > global stores in the scope of this KIP? > > On Thu, Jan 11, 2018 at 9:06 PM, Matthias J. Sax > wrote: > >> Dear Kafka community, >> >> I want to propose KIP-247 to add public test utils to the Streams API. >> The goal is to simplify testing of Kafka Streams applications. >> >> Please find details in the wiki: >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 247%3A+Add+public+test+utils+for+Kafka+Streams >> >> This is an initial KIP, and we hope to add more utility functions later. >> Thus, this KIP is not comprehensive but a first step. Of course, we can >> enrich this initial KIP if we think it falls too short. But we should >> not aim to be comprehensive to keep the scope manageable. >> >> In fact, I think we should add some more helpers to simplify result >> verification. I will update the KIP with this asap. Just wanted to start >> the discussion early on. >> >> An initial WIP PR can be found here: >> https://github.com/apache/kafka/pull/4402 >> >> I also included the user-list (please hit "reply-all" to include both >> lists in this KIP discussion). >> >> Thanks a lot. >> >> >> -Matthias >> >> >> > signature.asc Description: OpenPGP digital signature
Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
>From what I can tell, global state stores are managed separately from other state stores and are accessed via different methods. Do the proposed methods on TopologyTestDriver (such as getStateStore) cover global stores? If not, can we add an interface for accessing and testing global stores in the scope of this KIP? On Thu, Jan 11, 2018 at 9:06 PM, Matthias J. Sax wrote: > Dear Kafka community, > > I want to propose KIP-247 to add public test utils to the Streams API. > The goal is to simplify testing of Kafka Streams applications. > > Please find details in the wiki: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 247%3A+Add+public+test+utils+for+Kafka+Streams > > This is an initial KIP, and we hope to add more utility functions later. > Thus, this KIP is not comprehensive but a first step. Of course, we can > enrich this initial KIP if we think it falls too short. But we should > not aim to be comprehensive to keep the scope manageable. > > In fact, I think we should add some more helpers to simplify result > verification. I will update the KIP with this asap. Just wanted to start > the discussion early on. > > An initial WIP PR can be found here: > https://github.com/apache/kafka/pull/4402 > > I also included the user-list (please hit "reply-all" to include both > lists in this KIP discussion). > > Thanks a lot. > > > -Matthias > > >
Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Thanks for the KIP! One meta question: Will users that are currently using the existing testing code with the "classifier:test" approach: 1) have access to the new testing utilities without updating the gradle.build file 2) can they continue to use the current testing code with the classifier approach? A question on the KIP itself. Since we have factory methods for creating `ConsumerRecord` objects where we can either override or use the default topic do we still all overloads on `ConsumerRecordFactory`? Maybe we could just have constructors specifying the default topic and users could provide "null" if not wanting to specify a topic when creating the `ConsumerRecordFactory`. Even though this is an initial KIP, I agree that adding some helper methods for result verification would be a good idea, but I don't think it should hold up the KIP process if we don't get to it on this pass. Otherwise, it's a +1 from me. Thanks, Bill On Tue, Jan 16, 2018 at 1:01 PM, Guozhang Wang wrote: > Thanks Matthias, I made a pass over the wiki and left some comments; I see > you have addressed most of them. Here are a few more: > > 1. "TopologyTestDriver#process()": how about rename it to "pipeInput" or > "sendInput"? > > 2. For "ConsumerRecordFactory" constructor where "startTimestampMs" is not > specified, what would be the default value? > > > This is not a comment but just reminder: > > 3. ConsumerRecordFactory: I have to admit that putting this class in > o.a.k.streams.test may not be ideal, and if we are going to have an > o.a.k.clients.test in the future testing artifact this may better be moved > there. > > > Guozhang > > > On Mon, Jan 15, 2018 at 2:59 AM, Saïd Bouras > wrote: > > > Hi Matthias, > > > > I read the KIP and it will be very helpful thanks to the changes, I don't > > see though a part that handle topologies that use avro schemas, is it in > > the scope of the KIP ? > > > > I open an issue two month ago in the schema-registry repo : > > https://github.com/confluentinc/schema-registry/issues/651 that explain > > that when testing topologies using schema registry, the schema registry > > client mock is not thread safe and thus in the different processors nodes > > when deserializing it will not work... > > > > In my unit tests I wrapped the mock schema registry client into a > singleton > > but this solution is not enough satisfying. > > > > Thanks in advance, regards :-) > > > > > > On Fri, Jan 12, 2018 at 3:06 AM Matthias J. Sax > > wrote: > > > > > Dear Kafka community, > > > > > > I want to propose KIP-247 to add public test utils to the Streams API. > > > The goal is to simplify testing of Kafka Streams applications. > > > > > > Please find details in the wiki: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 247%3A+Add+public+test+utils+for+Kafka+Streams > > > > > > This is an initial KIP, and we hope to add more utility functions > later. > > > Thus, this KIP is not comprehensive but a first step. Of course, we can > > > enrich this initial KIP if we think it falls too short. But we should > > > not aim to be comprehensive to keep the scope manageable. > > > > > > In fact, I think we should add some more helpers to simplify result > > > verification. I will update the KIP with this asap. Just wanted to > start > > > the discussion early on. > > > > > > An initial WIP PR can be found here: > > > https://github.com/apache/kafka/pull/4402 > > > > > > I also included the user-list (please hit "reply-all" to include both > > > lists in this KIP discussion). > > > > > > Thanks a lot. > > > > > > > > > -Matthias > > > > > > > > > > > > > -- > > > > Saïd BOURAS > > > > Consultant Big Data > > Mobile: 0662988731 > > Zenika Paris > > 10 rue de Milan 75009 Paris > > Standard : +33(0)1 45 26 19 15 <+33(0)145261915> - Fax : +33(0)1 72 70 > 45 > > 10 > > <+33(0)172704510> > > > > > > -- > -- Guozhang >
Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Thanks, Matthias, this looks great. It seems like these APIs could either be used against mock objects, or against real brokers running in the same process. Is there a way for the user to select which they want when using the API? Sorry if it's in the KIP and I missed it. cheers, Colin On Thu, Jan 11, 2018, at 18:06, Matthias J. Sax wrote: > Dear Kafka community, > > I want to propose KIP-247 to add public test utils to the Streams API. > The goal is to simplify testing of Kafka Streams applications. > > Please find details in the wiki: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams > > This is an initial KIP, and we hope to add more utility functions later. > Thus, this KIP is not comprehensive but a first step. Of course, we can > enrich this initial KIP if we think it falls too short. But we should > not aim to be comprehensive to keep the scope manageable. > > In fact, I think we should add some more helpers to simplify result > verification. I will update the KIP with this asap. Just wanted to start > the discussion early on. > > An initial WIP PR can be found here: > https://github.com/apache/kafka/pull/4402 > > I also included the user-list (please hit "reply-all" to include both > lists in this KIP discussion). > > Thanks a lot. > > > -Matthias > > > Email had 1 attachment: > + signature.asc > 1k (application/pgp-signature)
Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Thanks Matthias, I made a pass over the wiki and left some comments; I see you have addressed most of them. Here are a few more: 1. "TopologyTestDriver#process()": how about rename it to "pipeInput" or "sendInput"? 2. For "ConsumerRecordFactory" constructor where "startTimestampMs" is not specified, what would be the default value? This is not a comment but just reminder: 3. ConsumerRecordFactory: I have to admit that putting this class in o.a.k.streams.test may not be ideal, and if we are going to have an o.a.k.clients.test in the future testing artifact this may better be moved there. Guozhang On Mon, Jan 15, 2018 at 2:59 AM, Saïd Bouras wrote: > Hi Matthias, > > I read the KIP and it will be very helpful thanks to the changes, I don't > see though a part that handle topologies that use avro schemas, is it in > the scope of the KIP ? > > I open an issue two month ago in the schema-registry repo : > https://github.com/confluentinc/schema-registry/issues/651 that explain > that when testing topologies using schema registry, the schema registry > client mock is not thread safe and thus in the different processors nodes > when deserializing it will not work... > > In my unit tests I wrapped the mock schema registry client into a singleton > but this solution is not enough satisfying. > > Thanks in advance, regards :-) > > > On Fri, Jan 12, 2018 at 3:06 AM Matthias J. Sax > wrote: > > > Dear Kafka community, > > > > I want to propose KIP-247 to add public test utils to the Streams API. > > The goal is to simplify testing of Kafka Streams applications. > > > > Please find details in the wiki: > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 247%3A+Add+public+test+utils+for+Kafka+Streams > > > > This is an initial KIP, and we hope to add more utility functions later. > > Thus, this KIP is not comprehensive but a first step. Of course, we can > > enrich this initial KIP if we think it falls too short. But we should > > not aim to be comprehensive to keep the scope manageable. > > > > In fact, I think we should add some more helpers to simplify result > > verification. I will update the KIP with this asap. Just wanted to start > > the discussion early on. > > > > An initial WIP PR can be found here: > > https://github.com/apache/kafka/pull/4402 > > > > I also included the user-list (please hit "reply-all" to include both > > lists in this KIP discussion). > > > > Thanks a lot. > > > > > > -Matthias > > > > > > > > -- > > Saïd BOURAS > > Consultant Big Data > Mobile: 0662988731 > Zenika Paris > 10 rue de Milan 75009 Paris > Standard : +33(0)1 45 26 19 15 <+33(0)145261915> - Fax : +33(0)1 72 70 45 > 10 > <+33(0)172704510> > -- -- Guozhang
Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Hi Matthias, I read the KIP and it will be very helpful thanks to the changes, I don't see though a part that handle topologies that use avro schemas, is it in the scope of the KIP ? I open an issue two month ago in the schema-registry repo : https://github.com/confluentinc/schema-registry/issues/651 that explain that when testing topologies using schema registry, the schema registry client mock is not thread safe and thus in the different processors nodes when deserializing it will not work... In my unit tests I wrapped the mock schema registry client into a singleton but this solution is not enough satisfying. Thanks in advance, regards :-) On Fri, Jan 12, 2018 at 3:06 AM Matthias J. Sax wrote: > Dear Kafka community, > > I want to propose KIP-247 to add public test utils to the Streams API. > The goal is to simplify testing of Kafka Streams applications. > > Please find details in the wiki: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams > > This is an initial KIP, and we hope to add more utility functions later. > Thus, this KIP is not comprehensive but a first step. Of course, we can > enrich this initial KIP if we think it falls too short. But we should > not aim to be comprehensive to keep the scope manageable. > > In fact, I think we should add some more helpers to simplify result > verification. I will update the KIP with this asap. Just wanted to start > the discussion early on. > > An initial WIP PR can be found here: > https://github.com/apache/kafka/pull/4402 > > I also included the user-list (please hit "reply-all" to include both > lists in this KIP discussion). > > Thanks a lot. > > > -Matthias > > > -- Saïd BOURAS Consultant Big Data Mobile: 0662988731 Zenika Paris 10 rue de Milan 75009 Paris Standard : +33(0)1 45 26 19 15 <+33(0)145261915> - Fax : +33(0)1 72 70 45 10 <+33(0)172704510>
[DISCUSS] KIP-247: Add public test utils for Kafka Streams
Dear Kafka community, I want to propose KIP-247 to add public test utils to the Streams API. The goal is to simplify testing of Kafka Streams applications. Please find details in the wiki: https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams This is an initial KIP, and we hope to add more utility functions later. Thus, this KIP is not comprehensive but a first step. Of course, we can enrich this initial KIP if we think it falls too short. But we should not aim to be comprehensive to keep the scope manageable. In fact, I think we should add some more helpers to simplify result verification. I will update the KIP with this asap. Just wanted to start the discussion early on. An initial WIP PR can be found here: https://github.com/apache/kafka/pull/4402 I also included the user-list (please hit "reply-all" to include both lists in this KIP discussion). Thanks a lot. -Matthias signature.asc Description: OpenPGP digital signature
Re: performance test using real data - comparing throughput & latency
Revin, We instrument our data pipeline and Kafka applications using both with wall time within our consumers and stream processors, and in a separate application that measures end-to-end latency in our data processing pipelines. We report these metrics to a metric aggregator, which in our case is Datadog. Using these metrics, we perform isolated performance experiments on production volume data. I would be very happy to hear about your results as well. Best, Håkon lør. 16. sep. 2017 kl. 02:13 skrev Matt Andruff : > Look,. I'm a huge fan of sending identical data and using plane old 'wall > time' and averaging a couple runs to make sure you remove any whoops. > > You can use fancy tools for reporting but in the real world wall time still > is the most critical factor. And let's face it it's also simple to > measure. > > I personally would love to hear about your results. If you'd be willing to > share. > > On Fri, Sep 15, 2017, 15:01 Revin Chalil wrote: > > > Any thoughts on the below will be appreciated. Thanks. > > > > > > On 9/13/17, 5:00 PM, "Revin Chalil" wrote: > > > > We are testing kafka’s performance with the real prod data and plan > to > > test things like the below. We would have producers publishing and > > consumers processing production data on a separate non-prod kafka > cluster. > > > > > > * Impact of number of Partitions per Topic on throughput and > > latency on Producer & Consumer > > * Impact of scaling-up Brokers on throughput and latency > > * adding more brokers Vs adding more Disk on existing Brokers. > How > > does the network interface usage differ? > > * cost of Replication on Throughput and Latency > > * impact of Broker vm.swappiness = 60 Vs vm.swappiness = 1 > > * partitions on a Broker pointing to single Disk Vs multiple > Disks > > * EXT4 Vs XFS Filesystem on broker > > * behavior when Broker “num.io<http://num.io/>.threads” is > > increased from 8 to higher value > > * behavior when Broker “num.network.threads” is increased from 3 > > to higher value > > * behavior when the data segment size is increased from 1 GB > > (current setting) > > * producer “acks = 1” Vs “acks = all” (current setting) impact on > > throughput and latency > > * producer sending with Compression enabled (snappy?) Vs sending > > without Compression > > * setting producer batch-size (memory based) Vs record-count > > (current setting) per batch sent to Kafka > > * impact of message size throughput > > * Consumers fetching records from page-cache Vs fetching records > > from Disk > > > > > > Ideally, the metrics we would like to compare for each test are > > (please let know if there are anything else to be compared) > > > > * Producer write Throughput > > * Producer write Latency (ms) > > * Consumption Throughput > > * Consumption Latency (ms) > > * End-to-end Latency > > > > What would be the right tools to collect and compare the above > metrics > > against different Tests? I have setup kafka-monitor but couldn’t find how > > to track the throughput and latency. Kafka-web-console seems to have some > > of these available? Kafka-Manager? Burrow? Anything else? Thank you. > > > > Since we are going to use our own producers and consumers, I do not > > think it makes sense to use tools like kafka-consumer-perf-test.sh or > > kafka-producer-perf-test.sh, but please correct if I am wrong. > > > > Thanks, > > Revin > > > > > > > > > > >
Re: performance test using real data - comparing throughput & latency
When building these kinds of tests I always just orchestrated my producers and consumers to spit metrics out somewhere easy to collect. Never looked for a ui/tool to do it before. Assuming good NTP configs (sub ms accuracy), I would typically put timing data into the key portion of the messages (message create time) on the producer. On the consumer I would read that create time and compare to system time to report on latency. Throughput on both sides is simple to track. I use yammer metrics ( http://metrics.dropwizard.io/3.2.3/) typically as you can wire some of those up and get awesome details. Just turn on the console reporter for the easiest start. Its not much code to put out a repeatable test with the metrics you care for. The only other test I'd look to add is scaling producers and consumers, that's where kafka really shines. :) Hope that helps some. On Fri, Sep 15, 2017 at 8:13 PM, Matt Andruff wrote: > Look,. I'm a huge fan of sending identical data and using plane old 'wall > time' and averaging a couple runs to make sure you remove any whoops. > > You can use fancy tools for reporting but in the real world wall time still > is the most critical factor. And let's face it it's also simple to > measure. > > I personally would love to hear about your results. If you'd be willing to > share. > > On Fri, Sep 15, 2017, 15:01 Revin Chalil wrote: > > > Any thoughts on the below will be appreciated. Thanks. > > > > > > On 9/13/17, 5:00 PM, "Revin Chalil" wrote: > > > > We are testing kafka’s performance with the real prod data and plan > to > > test things like the below. We would have producers publishing and > > consumers processing production data on a separate non-prod kafka > cluster. > > > > > > * Impact of number of Partitions per Topic on throughput and > > latency on Producer & Consumer > > * Impact of scaling-up Brokers on throughput and latency > > * adding more brokers Vs adding more Disk on existing Brokers. > How > > does the network interface usage differ? > > * cost of Replication on Throughput and Latency > > * impact of Broker vm.swappiness = 60 Vs vm.swappiness = 1 > > * partitions on a Broker pointing to single Disk Vs multiple > Disks > > * EXT4 Vs XFS Filesystem on broker > > * behavior when Broker “num.io<http://num.io/>.threads” is > > increased from 8 to higher value > > * behavior when Broker “num.network.threads” is increased from 3 > > to higher value > > * behavior when the data segment size is increased from 1 GB > > (current setting) > > * producer “acks = 1” Vs “acks = all” (current setting) impact on > > throughput and latency > > * producer sending with Compression enabled (snappy?) Vs sending > > without Compression > > * setting producer batch-size (memory based) Vs record-count > > (current setting) per batch sent to Kafka > > * impact of message size throughput > > * Consumers fetching records from page-cache Vs fetching records > > from Disk > > > > > > Ideally, the metrics we would like to compare for each test are > > (please let know if there are anything else to be compared) > > > > * Producer write Throughput > > * Producer write Latency (ms) > > * Consumption Throughput > > * Consumption Latency (ms) > > * End-to-end Latency > > > > What would be the right tools to collect and compare the above > metrics > > against different Tests? I have setup kafka-monitor but couldn’t find how > > to track the throughput and latency. Kafka-web-console seems to have some > > of these available? Kafka-Manager? Burrow? Anything else? Thank you. > > > > Since we are going to use our own producers and consumers, I do not > > think it makes sense to use tools like kafka-consumer-perf-test.sh or > > kafka-producer-perf-test.sh, but please correct if I am wrong. > > > > Thanks, > > Revin > > > > > > > > > > >
Re: performance test using real data - comparing throughput & latency
Look,. I'm a huge fan of sending identical data and using plane old 'wall time' and averaging a couple runs to make sure you remove any whoops. You can use fancy tools for reporting but in the real world wall time still is the most critical factor. And let's face it it's also simple to measure. I personally would love to hear about your results. If you'd be willing to share. On Fri, Sep 15, 2017, 15:01 Revin Chalil wrote: > Any thoughts on the below will be appreciated. Thanks. > > > On 9/13/17, 5:00 PM, "Revin Chalil" wrote: > > We are testing kafka’s performance with the real prod data and plan to > test things like the below. We would have producers publishing and > consumers processing production data on a separate non-prod kafka cluster. > > > * Impact of number of Partitions per Topic on throughput and > latency on Producer & Consumer > * Impact of scaling-up Brokers on throughput and latency > * adding more brokers Vs adding more Disk on existing Brokers. How > does the network interface usage differ? > * cost of Replication on Throughput and Latency > * impact of Broker vm.swappiness = 60 Vs vm.swappiness = 1 > * partitions on a Broker pointing to single Disk Vs multiple Disks > * EXT4 Vs XFS Filesystem on broker > * behavior when Broker “num.io<http://num.io/>.threads” is > increased from 8 to higher value > * behavior when Broker “num.network.threads” is increased from 3 > to higher value > * behavior when the data segment size is increased from 1 GB > (current setting) > * producer “acks = 1” Vs “acks = all” (current setting) impact on > throughput and latency > * producer sending with Compression enabled (snappy?) Vs sending > without Compression > * setting producer batch-size (memory based) Vs record-count > (current setting) per batch sent to Kafka > * impact of message size throughput > * Consumers fetching records from page-cache Vs fetching records > from Disk > > > Ideally, the metrics we would like to compare for each test are > (please let know if there are anything else to be compared) > > * Producer write Throughput > * Producer write Latency (ms) > * Consumption Throughput > * Consumption Latency (ms) > * End-to-end Latency > > What would be the right tools to collect and compare the above metrics > against different Tests? I have setup kafka-monitor but couldn’t find how > to track the throughput and latency. Kafka-web-console seems to have some > of these available? Kafka-Manager? Burrow? Anything else? Thank you. > > Since we are going to use our own producers and consumers, I do not > think it makes sense to use tools like kafka-consumer-perf-test.sh or > kafka-producer-perf-test.sh, but please correct if I am wrong. > > Thanks, > Revin > > > > >
Re: performance test using real data - comparing throughput & latency
Any thoughts on the below will be appreciated. Thanks. On 9/13/17, 5:00 PM, "Revin Chalil" wrote: We are testing kafka’s performance with the real prod data and plan to test things like the below. We would have producers publishing and consumers processing production data on a separate non-prod kafka cluster. * Impact of number of Partitions per Topic on throughput and latency on Producer & Consumer * Impact of scaling-up Brokers on throughput and latency * adding more brokers Vs adding more Disk on existing Brokers. How does the network interface usage differ? * cost of Replication on Throughput and Latency * impact of Broker vm.swappiness = 60 Vs vm.swappiness = 1 * partitions on a Broker pointing to single Disk Vs multiple Disks * EXT4 Vs XFS Filesystem on broker * behavior when Broker “num.io<http://num.io/>.threads” is increased from 8 to higher value * behavior when Broker “num.network.threads” is increased from 3 to higher value * behavior when the data segment size is increased from 1 GB (current setting) * producer “acks = 1” Vs “acks = all” (current setting) impact on throughput and latency * producer sending with Compression enabled (snappy?) Vs sending without Compression * setting producer batch-size (memory based) Vs record-count (current setting) per batch sent to Kafka * impact of message size throughput * Consumers fetching records from page-cache Vs fetching records from Disk Ideally, the metrics we would like to compare for each test are (please let know if there are anything else to be compared) * Producer write Throughput * Producer write Latency (ms) * Consumption Throughput * Consumption Latency (ms) * End-to-end Latency What would be the right tools to collect and compare the above metrics against different Tests? I have setup kafka-monitor but couldn’t find how to track the throughput and latency. Kafka-web-console seems to have some of these available? Kafka-Manager? Burrow? Anything else? Thank you. Since we are going to use our own producers and consumers, I do not think it makes sense to use tools like kafka-consumer-perf-test.sh or kafka-producer-perf-test.sh, but please correct if I am wrong. Thanks, Revin
performance test using real data - comparing throughput & latency
We are testing kafka’s performance with the real prod data and plan to test things like the below. We would have producers publishing and consumers processing production data on a separate non-prod kafka cluster. * Impact of number of Partitions per Topic on throughput and latency on Producer & Consumer * Impact of scaling-up Brokers on throughput and latency * adding more brokers Vs adding more Disk on existing Brokers. How does the network interface usage differ? * cost of Replication on Throughput and Latency * impact of Broker vm.swappiness = 60 Vs vm.swappiness = 1 * partitions on a Broker pointing to single Disk Vs multiple Disks * EXT4 Vs XFS Filesystem on broker * behavior when Broker “num.io<http://num.io/>.threads” is increased from 8 to higher value * behavior when Broker “num.network.threads” is increased from 3 to higher value * behavior when the data segment size is increased from 1 GB (current setting) * producer “acks = 1” Vs “acks = all” (current setting) impact on throughput and latency * producer sending with Compression enabled (snappy?) Vs sending without Compression * setting producer batch-size (memory based) Vs record-count (current setting) per batch sent to Kafka * impact of message size throughput * Consumers fetching records from page-cache Vs fetching records from Disk Ideally, the metrics we would like to compare for each test are (please let know if there are anything else to be compared) * Producer write Throughput * Producer write Latency (ms) * Consumption Throughput * Consumption Latency (ms) * End-to-end Latency What would be the right tools to collect and compare the above metrics against different Tests? I have setup kafka-monitor but couldn’t find how to track the throughput and latency. Kafka-web-console seems to have some of these available? Kafka-Manager? Burrow? Anything else? Thank you. Since we are going to use our own producers and consumers, I do not think it makes sense to use tools like kafka-consumer-perf-test.sh or kafka-producer-perf-test.sh, but please correct if I am wrong. Thanks, Revin
how about kafka test
kafka is a distributed, partitioned, replicated commit log service message queue, i am interesting how kafka test. transaction message ,exactly once message ,replication ,any document about kafka test? how to ensure kafka work as except? -- = fuyou001 Best Regards
Re: "Failed to update metadata" in integration test for exactly once stream and custom store
Yeah, I'll open a PR. 2017-07-12 11:07 GMT-06:00 Matthias J. Sax : > Ups. That is definitely a bug in the test. Thanks for pointing it out! > > Do you wanna open a PR it? If not, I can take care, too. > > -Matthias > > On 7/12/17 8:48 AM, Joel Dice . wrote: >> After some debugging, I figured it out. The name of my custom store >> was "mapStore", so the store tried to log changes to a topic with that >> name, but my test case never created such a topic. Unsurprisingly, >> the producer couldn't get metadata for a nonexistent topic, so it >> failed with a timeout. >> >> With this trivial fix, the test passes: >> https://github.com/dicej/kafka-eos-test/commit/0fdf400350e41092f1973a80e9e4196003859ddc >> >> BTW, in EosIntegrationTest, shouldn't this line read "outputTopic," >> instead of "inputTopic,": >> >> https://github.com/apache/kafka/blob/44167399157b06d566401a3f6c17e9ca901a8c20/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java#L195 >> >> As written, the test seems to only verify that data produced to >> inputTopic can be consumed from inputTopic, whereas the intention of >> the test seems to be that data is copied from inputTopic to >> outputTopic, so I would expect it to verify that it actually made it >> to outputTopic. >> >> >> 2017-07-11 16:25 GMT-06:00 Matthias J. Sax : >>> Seems Streams cannot connect (or looses connection) to the brokers. Not >>> sure why. >>> >>> You can also have a look here for our own EOS Streams integration test: >>> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java >>> >>> >>> -Matthias >>> >>> >>> On 7/11/17 2:13 PM, Joel Dice . wrote: >>>> Hi all, >>>> >>>> I wrote an integration test to see how Kafka Streams handles exactly >>>> once processing with a custom state store, and it's failing with this >>>> error: >>>> >>>> org.apache.kafka.common.errors.TimeoutException: Failed to update >>>> metadata after 6 ms >>>> >>>> I'm not sure whether this is a bug in my code, a bug in Kafka, or a >>>> bug in the EmbeddedKafkaCluster class I'm using for testing. >>>> >>>> The test is available here: https://github.com/dicej/kafka-eos-test, >>>> and it can be run using "gradle test". >>>> >>>> The test class itself is >>>> https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java. >>>> It's based loosely on the EosIntegrationTest class in the Kafka source >>>> tree. >>>> >>>> This is the full trace: >>>> >>>> Exception in thread >>>> "appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1" >>>> org.apache.kafka.streams.errors.StreamsException: Exception caught in >>>> process. taskId=0_0, processor=inputTopic, topic=inputTopic, >>>> partition=0, offset=1 >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) >>>> >>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task >>>> [0_0] exception caught when producing >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) >>>> >>>> at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259) >>>> >>>> at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236) >>>> >>>> at >>>> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227) >>>> >>>> at >
Re: "Failed to update metadata" in integration test for exactly once stream and custom store
Ups. That is definitely a bug in the test. Thanks for pointing it out! Do you wanna open a PR it? If not, I can take care, too. -Matthias On 7/12/17 8:48 AM, Joel Dice . wrote: > After some debugging, I figured it out. The name of my custom store > was "mapStore", so the store tried to log changes to a topic with that > name, but my test case never created such a topic. Unsurprisingly, > the producer couldn't get metadata for a nonexistent topic, so it > failed with a timeout. > > With this trivial fix, the test passes: > https://github.com/dicej/kafka-eos-test/commit/0fdf400350e41092f1973a80e9e4196003859ddc > > BTW, in EosIntegrationTest, shouldn't this line read "outputTopic," > instead of "inputTopic,": > > https://github.com/apache/kafka/blob/44167399157b06d566401a3f6c17e9ca901a8c20/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java#L195 > > As written, the test seems to only verify that data produced to > inputTopic can be consumed from inputTopic, whereas the intention of > the test seems to be that data is copied from inputTopic to > outputTopic, so I would expect it to verify that it actually made it > to outputTopic. > > > 2017-07-11 16:25 GMT-06:00 Matthias J. Sax : >> Seems Streams cannot connect (or looses connection) to the brokers. Not >> sure why. >> >> You can also have a look here for our own EOS Streams integration test: >> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java >> >> >> -Matthias >> >> >> On 7/11/17 2:13 PM, Joel Dice . wrote: >>> Hi all, >>> >>> I wrote an integration test to see how Kafka Streams handles exactly >>> once processing with a custom state store, and it's failing with this >>> error: >>> >>> org.apache.kafka.common.errors.TimeoutException: Failed to update >>> metadata after 6 ms >>> >>> I'm not sure whether this is a bug in my code, a bug in Kafka, or a >>> bug in the EmbeddedKafkaCluster class I'm using for testing. >>> >>> The test is available here: https://github.com/dicej/kafka-eos-test, >>> and it can be run using "gradle test". >>> >>> The test class itself is >>> https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java. >>> It's based loosely on the EosIntegrationTest class in the Kafka source >>> tree. >>> >>> This is the full trace: >>> >>> Exception in thread >>> "appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1" >>> org.apache.kafka.streams.errors.StreamsException: Exception caught in >>> process. taskId=0_0, processor=inputTopic, topic=inputTopic, >>> partition=0, offset=1 >>> >>> at >>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) >>> >>> at >>> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) >>> >>> at >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) >>> >>> at >>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) >>> >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task >>> [0_0] exception caught when producing >>> >>> at >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136) >>> >>> at >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) >>> >>> at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259) >>> >>> at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236) >>> >>> at >>> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227) >>> >>> at >>> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:203) >>> >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) >>> >>> at >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) >>> >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) >>> >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) >>> >>> at >>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) >>> >>> at >>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) >>> >>> ... 3 more >>> >>> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed >>> to update metadata after 6 ms. >>> >>> >>> Any ideas what might be going wrong? >>> >>> Cheers, >>> Joel >>> >> signature.asc Description: OpenPGP digital signature
Re: "Failed to update metadata" in integration test for exactly once stream and custom store
After some debugging, I figured it out. The name of my custom store was "mapStore", so the store tried to log changes to a topic with that name, but my test case never created such a topic. Unsurprisingly, the producer couldn't get metadata for a nonexistent topic, so it failed with a timeout. With this trivial fix, the test passes: https://github.com/dicej/kafka-eos-test/commit/0fdf400350e41092f1973a80e9e4196003859ddc BTW, in EosIntegrationTest, shouldn't this line read "outputTopic," instead of "inputTopic,": https://github.com/apache/kafka/blob/44167399157b06d566401a3f6c17e9ca901a8c20/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java#L195 As written, the test seems to only verify that data produced to inputTopic can be consumed from inputTopic, whereas the intention of the test seems to be that data is copied from inputTopic to outputTopic, so I would expect it to verify that it actually made it to outputTopic. 2017-07-11 16:25 GMT-06:00 Matthias J. Sax : > Seems Streams cannot connect (or looses connection) to the brokers. Not > sure why. > > You can also have a look here for our own EOS Streams integration test: > https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java > > > -Matthias > > > On 7/11/17 2:13 PM, Joel Dice . wrote: >> Hi all, >> >> I wrote an integration test to see how Kafka Streams handles exactly >> once processing with a custom state store, and it's failing with this >> error: >> >> org.apache.kafka.common.errors.TimeoutException: Failed to update >> metadata after 6 ms >> >> I'm not sure whether this is a bug in my code, a bug in Kafka, or a >> bug in the EmbeddedKafkaCluster class I'm using for testing. >> >> The test is available here: https://github.com/dicej/kafka-eos-test, >> and it can be run using "gradle test". >> >> The test class itself is >> https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java. >> It's based loosely on the EosIntegrationTest class in the Kafka source >> tree. >> >> This is the full trace: >> >> Exception in thread >> "appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1" >> org.apache.kafka.streams.errors.StreamsException: Exception caught in >> process. taskId=0_0, processor=inputTopic, topic=inputTopic, >> partition=0, offset=1 >> >> at >> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) >> >> at >> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) >> >> at >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) >> >> at >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) >> >> Caused by: org.apache.kafka.streams.errors.StreamsException: task >> [0_0] exception caught when producing >> >> at >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136) >> >> at >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) >> >> at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259) >> >> at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236) >> >> at >> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227) >> >> at >> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:203) >> >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) >> >> at >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) >> >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) >> >> at >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) >> >> at >> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) >> >> at >> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) >> >> ... 3 more >> >> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed >> to update metadata after 6 ms. >> >> >> Any ideas what might be going wrong? >> >> Cheers, >> Joel >> >
Re: "Failed to update metadata" in integration test for exactly once stream and custom store
Seems Streams cannot connect (or looses connection) to the brokers. Not sure why. You can also have a look here for our own EOS Streams integration test: https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java -Matthias On 7/11/17 2:13 PM, Joel Dice . wrote: > Hi all, > > I wrote an integration test to see how Kafka Streams handles exactly > once processing with a custom state store, and it's failing with this > error: > > org.apache.kafka.common.errors.TimeoutException: Failed to update > metadata after 6 ms > > I'm not sure whether this is a bug in my code, a bug in Kafka, or a > bug in the EmbeddedKafkaCluster class I'm using for testing. > > The test is available here: https://github.com/dicej/kafka-eos-test, > and it can be run using "gradle test". > > The test class itself is > https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java. > It's based loosely on the EosIntegrationTest class in the Kafka source > tree. > > This is the full trace: > > Exception in thread > "appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_0, processor=inputTopic, topic=inputTopic, > partition=0, offset=1 > > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) > > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) > > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > > Caused by: org.apache.kafka.streams.errors.StreamsException: task > [0_0] exception caught when producing > > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136) > > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) > > at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259) > > at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236) > > at > KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227) > > at > KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:203) > > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) > > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) > > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) > > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) > > ... 3 more > > Caused by: org.apache.kafka.common.errors.TimeoutException: Failed > to update metadata after 6 ms. > > > Any ideas what might be going wrong? > > Cheers, > Joel > signature.asc Description: OpenPGP digital signature
"Failed to update metadata" in integration test for exactly once stream and custom store
Hi all, I wrote an integration test to see how Kafka Streams handles exactly once processing with a custom state store, and it's failing with this error: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms I'm not sure whether this is a bug in my code, a bug in Kafka, or a bug in the EmbeddedKafkaCluster class I'm using for testing. The test is available here: https://github.com/dicej/kafka-eos-test, and it can be run using "gradle test". The test class itself is https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java. It's based loosely on the EosIntegrationTest class in the Kafka source tree. This is the full trace: Exception in thread "appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=inputTopic, topic=inputTopic, partition=0, offset=1 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259) at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236) at KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227) at KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:203) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) ... 3 more Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. Any ideas what might be going wrong? Cheers, Joel