[jira] [Assigned] (KAFKA-2058) ProducerTest.testSendWithDeadBroker transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-2058: -- Assignee: Bill Bejeck > ProducerTest.testSendWithDeadBroker transient failure > - > > Key: KAFKA-2058 > URL: https://issues.apache.org/jira/browse/KAFKA-2058 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie > > {code} > kafka.producer.ProducerTest > testSendWithDeadBroker FAILED > java.lang.AssertionError: Message set should have 1 message > at org.junit.Assert.fail(Assert.java:92) > at org.junit.Assert.assertTrue(Assert.java:44) > at > kafka.producer.ProducerTest.testSendWithDeadBroker(ProducerTest.scala:260) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2058) ProducerTest.testSendWithDeadBroker transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14933260#comment-14933260 ] Bill Bejeck commented on KAFKA-2058: I've been trying to reproduce this error locally without much success. Can you give any further details of the circumstances when you get the error? Thanks. -Bill On Mon, Sep 28, 2015 at 6:04 AM, Rohinee Ashok Shinde (JIRA) < > ProducerTest.testSendWithDeadBroker transient failure > - > > Key: KAFKA-2058 > URL: https://issues.apache.org/jira/browse/KAFKA-2058 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie > > {code} > kafka.producer.ProducerTest > testSendWithDeadBroker FAILED > java.lang.AssertionError: Message set should have 1 message > at org.junit.Assert.fail(Assert.java:92) > at org.junit.Assert.assertTrue(Assert.java:44) > at > kafka.producer.ProducerTest.testSendWithDeadBroker(ProducerTest.scala:260) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2836) FAILING TEST: SaslSslTopicMetadataTest
[ https://issues.apache.org/jira/browse/KAFKA-2836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15062501#comment-15062501 ] Bill Bejeck commented on KAFKA-2836: Picking this bug up. > FAILING TEST: SaslSslTopicMetadataTest > -- > > Key: KAFKA-2836 > URL: https://issues.apache.org/jira/browse/KAFKA-2836 > Project: Kafka > Issue Type: Sub-task >Reporter: Gwen Shapira >Assignee: Bill Bejeck >Priority: Blocker > Labels: newbiee > > {code} > java.lang.AssertionError: Topic metadata is not correctly updated for broker > kafka.server.KafkaServer@161c6a7f. > Expected ISR: List(BrokerEndPoint(0,localhost,40558), > BrokerEndPoint(1,localhost,40526)) > Actual ISR : > at org.junit.Assert.fail(Assert.java:88) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:747) > at > kafka.integration.BaseTopicMetadataTest$$anonfun$checkIsr$1.apply(BaseTopicMetadataTest.scala:194) > at > kafka.integration.BaseTopicMetadataTest$$anonfun$checkIsr$1.apply(BaseTopicMetadataTest.scala:192) > at scala.collection.immutable.List.foreach(List.scala:318) > at > kafka.integration.BaseTopicMetadataTest.checkIsr(BaseTopicMetadataTest.scala:192) > at > kafka.integration.BaseTopicMetadataTest.testIsrAfterBrokerShutDownAndJoinsBack(BaseTopicMetadataTest.scala:237) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at >
[jira] [Assigned] (KAFKA-2836) FAILING TEST: SaslSslTopicMetadataTest
[ https://issues.apache.org/jira/browse/KAFKA-2836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-2836: -- Assignee: Bill Bejeck > FAILING TEST: SaslSslTopicMetadataTest > -- > > Key: KAFKA-2836 > URL: https://issues.apache.org/jira/browse/KAFKA-2836 > Project: Kafka > Issue Type: Sub-task >Reporter: Gwen Shapira >Assignee: Bill Bejeck >Priority: Blocker > Labels: newbiee > > {code} > java.lang.AssertionError: Topic metadata is not correctly updated for broker > kafka.server.KafkaServer@161c6a7f. > Expected ISR: List(BrokerEndPoint(0,localhost,40558), > BrokerEndPoint(1,localhost,40526)) > Actual ISR : > at org.junit.Assert.fail(Assert.java:88) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:747) > at > kafka.integration.BaseTopicMetadataTest$$anonfun$checkIsr$1.apply(BaseTopicMetadataTest.scala:194) > at > kafka.integration.BaseTopicMetadataTest$$anonfun$checkIsr$1.apply(BaseTopicMetadataTest.scala:192) > at scala.collection.immutable.List.foreach(List.scala:318) > at > kafka.integration.BaseTopicMetadataTest.checkIsr(BaseTopicMetadataTest.scala:192) > at > kafka.integration.BaseTopicMetadataTest.testIsrAfterBrokerShutDownAndJoinsBack(BaseTopicMetadataTest.scala:237) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360) > at >
[jira] [Work started] (KAFKA-2836) FAILING TEST: SaslSslTopicMetadataTest
[ https://issues.apache.org/jira/browse/KAFKA-2836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-2836 started by Bill Bejeck. -- > FAILING TEST: SaslSslTopicMetadataTest > -- > > Key: KAFKA-2836 > URL: https://issues.apache.org/jira/browse/KAFKA-2836 > Project: Kafka > Issue Type: Sub-task >Reporter: Gwen Shapira >Assignee: Bill Bejeck >Priority: Blocker > Labels: newbiee > > {code} > java.lang.AssertionError: Topic metadata is not correctly updated for broker > kafka.server.KafkaServer@161c6a7f. > Expected ISR: List(BrokerEndPoint(0,localhost,40558), > BrokerEndPoint(1,localhost,40526)) > Actual ISR : > at org.junit.Assert.fail(Assert.java:88) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:747) > at > kafka.integration.BaseTopicMetadataTest$$anonfun$checkIsr$1.apply(BaseTopicMetadataTest.scala:194) > at > kafka.integration.BaseTopicMetadataTest$$anonfun$checkIsr$1.apply(BaseTopicMetadataTest.scala:192) > at scala.collection.immutable.List.foreach(List.scala:318) > at > kafka.integration.BaseTopicMetadataTest.checkIsr(BaseTopicMetadataTest.scala:192) > at > kafka.integration.BaseTopicMetadataTest.testIsrAfterBrokerShutDownAndJoinsBack(BaseTopicMetadataTest.scala:237) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360) > at >
[jira] [Updated] (KAFKA-2902) StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of getBaseConsumerConfigs
[ https://issues.apache.org/jira/browse/KAFKA-2902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-2902: --- Description: When starting a KafkaStreaming instance the StreamingConfig.getConsumerConfigs method uses the getRestoreConsumerConfigs to retrieve properties. But this method removes the groupId property which causes an error and the KafkaStreaming instance shuts down. On KafkaStreaming startup StreamingConfig should use getBaseConsumerConfigs instead. Exception in thread "StreamThread-1" org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.ApiException: The configured groupId is invalid at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:309) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:198) Caused by: org.apache.kafka.common.errors.ApiException: The configured groupId is invalid was:When starting a KafkaStreaming instance the StreamingConfig.getConsumerConfigs method uses the getRestoreConsumerConfigs to retrieve properties. But this method removes the groupId property which causes an error and the KafkaStreaming instance shuts down. On KafkaStreaming startup StreamingConfig should use getBaseConsumerConfigs instead. > StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of > getBaseConsumerConfigs > - > > Key: KAFKA-2902 > URL: https://issues.apache.org/jira/browse/KAFKA-2902 > Project: Kafka > Issue Type: Bug > Components: kafka streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.9.1.0 > > > When starting a KafkaStreaming instance the > StreamingConfig.getConsumerConfigs method uses the getRestoreConsumerConfigs > to retrieve properties. But this method removes the groupId property which > causes an error and the KafkaStreaming instance shuts down. On > KafkaStreaming startup StreamingConfig should use getBaseConsumerConfigs > instead. > Exception in thread "StreamThread-1" org.apache.kafka.common.KafkaException: > org.apache.kafka.common.errors.ApiException: The configured groupId is invalid > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:309) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:198) > Caused by: org.apache.kafka.common.errors.ApiException: The configured > groupId is invalid -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2902) StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of getBaseConsumerConfigs
Bill Bejeck created KAFKA-2902: -- Summary: StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of getBaseConsumerConfigs Key: KAFKA-2902 URL: https://issues.apache.org/jira/browse/KAFKA-2902 Project: Kafka Issue Type: Bug Components: kafka streams Reporter: Bill Bejeck Assignee: Bill Bejeck Fix For: 0.9.1.0 When starting a KafkaStreaming instance the StreamingConfig.getConsumerConfigs method uses the getRestoreConsumerConfigs to retrieve properties. But this method removes the groupId property which causes an error and the KafkaStreaming instance shuts down. On KafkaStreaming startup StreamingConfig should use getBaseConsumerConfigs instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-2902) StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of getBaseConsumerConfigs
[ https://issues.apache.org/jira/browse/KAFKA-2902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-2902 started by Bill Bejeck. -- > StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of > getBaseConsumerConfigs > - > > Key: KAFKA-2902 > URL: https://issues.apache.org/jira/browse/KAFKA-2902 > Project: Kafka > Issue Type: Bug > Components: kafka streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.9.1.0 > > > When starting a KafkaStreaming instance the > StreamingConfig.getConsumerConfigs method uses the getRestoreConsumerConfigs > to retrieve properties. But this method removes the groupId property which > causes an error and the KafkaStreaming instance shuts down. On > KafkaStreaming startup StreamingConfig should use getBaseConsumerConfigs > instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work stopped] (KAFKA-2902) StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of getBaseConsumerConfigs
[ https://issues.apache.org/jira/browse/KAFKA-2902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-2902 stopped by Bill Bejeck. -- > StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of > getBaseConsumerConfigs > - > > Key: KAFKA-2902 > URL: https://issues.apache.org/jira/browse/KAFKA-2902 > Project: Kafka > Issue Type: Bug > Components: kafka streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.9.1.0 > > > When starting a KafkaStreaming instance the > StreamingConfig.getConsumerConfigs method uses the getRestoreConsumerConfigs > to retrieve properties. But this method removes the groupId property which > causes an error and the KafkaStreaming instance shuts down. On > KafkaStreaming startup StreamingConfig should use getBaseConsumerConfigs > instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2902) StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of getBaseConsumerConfigs
[ https://issues.apache.org/jira/browse/KAFKA-2902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-2902: --- Status: Patch Available (was: Open) Changes made for using getBaseConsumerProps from StreamingConfig from getConsumerProps call > StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of > getBaseConsumerConfigs > - > > Key: KAFKA-2902 > URL: https://issues.apache.org/jira/browse/KAFKA-2902 > Project: Kafka > Issue Type: Bug > Components: kafka streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.9.1.0 > > > When starting a KafkaStreaming instance the > StreamingConfig.getConsumerConfigs method uses the getRestoreConsumerConfigs > to retrieve properties. But this method removes the groupId property which > causes an error and the KafkaStreaming instance shuts down. On > KafkaStreaming startup StreamingConfig should use getBaseConsumerConfigs > instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2902) StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of getBaseConsumerConfigs
[ https://issues.apache.org/jira/browse/KAFKA-2902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-2902: --- Component/s: kafka streams > StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of > getBaseConsumerConfigs > - > > Key: KAFKA-2902 > URL: https://issues.apache.org/jira/browse/KAFKA-2902 > Project: Kafka > Issue Type: Bug > Components: kafka streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.9.1.0 > > > When starting a KafkaStreaming instance the > StreamingConfig.getConsumerConfigs method uses the getRestoreConsumerConfigs > to retrieve properties. But this method removes the groupId property which > causes an error and the KafkaStreaming instance shuts down. On > KafkaStreaming startup StreamingConfig should use getBaseConsumerConfigs > instead. > Exception in thread "StreamThread-1" org.apache.kafka.common.KafkaException: > org.apache.kafka.common.errors.ApiException: The configured groupId is invalid > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:309) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:198) > Caused by: org.apache.kafka.common.errors.ApiException: The configured > groupId is invalid -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2902) StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of getBaseConsumerConfigs
[ https://issues.apache.org/jira/browse/KAFKA-2902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-2902: --- Component/s: (was: kafka streams) > StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of > getBaseConsumerConfigs > - > > Key: KAFKA-2902 > URL: https://issues.apache.org/jira/browse/KAFKA-2902 > Project: Kafka > Issue Type: Bug >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.9.1.0 > > > When starting a KafkaStreaming instance the > StreamingConfig.getConsumerConfigs method uses the getRestoreConsumerConfigs > to retrieve properties. But this method removes the groupId property which > causes an error and the KafkaStreaming instance shuts down. On > KafkaStreaming startup StreamingConfig should use getBaseConsumerConfigs > instead. > Exception in thread "StreamThread-1" org.apache.kafka.common.KafkaException: > org.apache.kafka.common.errors.ApiException: The configured groupId is invalid > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:309) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:198) > Caused by: org.apache.kafka.common.errors.ApiException: The configured > groupId is invalid -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work stopped] (KAFKA-2872) Error starting KafkaStream caused by sink not being connected to parent source/processor nodes
[ https://issues.apache.org/jira/browse/KAFKA-2872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-2872 stopped by Bill Bejeck. -- > Error starting KafkaStream caused by sink not being connected to parent > source/processor nodes > -- > > Key: KAFKA-2872 > URL: https://issues.apache.org/jira/browse/KAFKA-2872 > Project: Kafka > Issue Type: Bug > Components: kafka streams >Affects Versions: 0.9.0.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > > When starting the KafkaStream I get the following Exception: > Exception in thread "main" java.util.NoSuchElementException: id: SINK > at > org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40) > at > org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387) > at > org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339) > at > org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:139) > at > org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:120) > at > org.apache.kafka.streams.KafkaStreaming.(KafkaStreaming.java:110) > at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35) > The TopologyBuilder is being built like so: > topologyBuilder.addSource("SOURCE", new StringDeserializer(), new > StringDeserializer(), "src-topic") > .addProcessor("PROCESS", new > GenericProcessorClient(replaceVowels), "SOURCE") > .addSink("SINK", "dest-topic", new StringSerializer(), new > StringSerializer(), "PROCESS"); > Looks to me the cause of the error is that in TopologyBuilder.addSink method > the sink is never connected with it's parent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work stopped] (KAFKA-2872) Error starting KafkaStream caused by sink not being connected to parent source/processor nodes
[ https://issues.apache.org/jira/browse/KAFKA-2872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-2872 stopped by Bill Bejeck. -- > Error starting KafkaStream caused by sink not being connected to parent > source/processor nodes > -- > > Key: KAFKA-2872 > URL: https://issues.apache.org/jira/browse/KAFKA-2872 > Project: Kafka > Issue Type: Bug > Components: kafka streams >Affects Versions: 0.9.0.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > > When starting the KafkaStream I get the following Exception: > Exception in thread "main" java.util.NoSuchElementException: id: SINK > at > org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40) > at > org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387) > at > org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339) > at > org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:139) > at > org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:120) > at > org.apache.kafka.streams.KafkaStreaming.(KafkaStreaming.java:110) > at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35) > The TopologyBuilder is being built like so: > topologyBuilder.addSource("SOURCE", new StringDeserializer(), new > StringDeserializer(), "src-topic") > .addProcessor("PROCESS", new > GenericProcessorClient(replaceVowels), "SOURCE") > .addSink("SINK", "dest-topic", new StringSerializer(), new > StringSerializer(), "PROCESS"); > Looks to me the cause of the error is that in TopologyBuilder.addSink method > the sink is never connected with it's parent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2872) Error starting KafkaStream caused by sink not being connected to parent source/processor nodes
[ https://issues.apache.org/jira/browse/KAFKA-2872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-2872: --- Fix Version/s: 0.9.0.0 Status: Patch Available (was: Open) Updated the ToplogyBuilder.addSink method to connect sink with parent processor/source > Error starting KafkaStream caused by sink not being connected to parent > source/processor nodes > -- > > Key: KAFKA-2872 > URL: https://issues.apache.org/jira/browse/KAFKA-2872 > Project: Kafka > Issue Type: Bug > Components: kafka streams >Affects Versions: 0.9.0.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.9.0.0 > > > When starting the KafkaStream I get the following Exception: > Exception in thread "main" java.util.NoSuchElementException: id: SINK > at > org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40) > at > org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387) > at > org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339) > at > org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:139) > at > org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:120) > at > org.apache.kafka.streams.KafkaStreaming.(KafkaStreaming.java:110) > at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35) > The TopologyBuilder is being built like so: > topologyBuilder.addSource("SOURCE", new StringDeserializer(), new > StringDeserializer(), "src-topic") > .addProcessor("PROCESS", new > GenericProcessorClient(replaceVowels), "SOURCE") > .addSink("SINK", "dest-topic", new StringSerializer(), new > StringSerializer(), "PROCESS"); > Looks to me the cause of the error is that in TopologyBuilder.addSink method > the sink is never connected with it's parent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2872) Error starting KafkaStream caused by sink not being connected to parent source/processor nodes
Bill Bejeck created KAFKA-2872: -- Summary: Error starting KafkaStream caused by sink not being connected to parent source/processor nodes Key: KAFKA-2872 URL: https://issues.apache.org/jira/browse/KAFKA-2872 Project: Kafka Issue Type: Bug Components: kafka streams Affects Versions: 0.9.0.0 Reporter: Bill Bejeck Assignee: Bill Bejeck When starting the KafkaStream I get the following Exception: Exception in thread "main" java.util.NoSuchElementException: id: SINK at org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40) at org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387) at org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339) at org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:139) at org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:120) at org.apache.kafka.streams.KafkaStreaming.(KafkaStreaming.java:110) at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35) The TopologyBuilder is being built like so: topologyBuilder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "src-topic") .addProcessor("PROCESS", new GenericProcessorClient(replaceVowels), "SOURCE") .addSink("SINK", "dest-topic", new StringSerializer(), new StringSerializer(), "PROCESS"); Looks to me the cause of the error is that in TopologyBuilder.addSink method the sink is never connected with it's parent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-2872) Error starting KafkaStream caused by sink not being connected to parent source/processor nodes
[ https://issues.apache.org/jira/browse/KAFKA-2872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-2872 started by Bill Bejeck. -- > Error starting KafkaStream caused by sink not being connected to parent > source/processor nodes > -- > > Key: KAFKA-2872 > URL: https://issues.apache.org/jira/browse/KAFKA-2872 > Project: Kafka > Issue Type: Bug > Components: kafka streams >Affects Versions: 0.9.0.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > > When starting the KafkaStream I get the following Exception: > Exception in thread "main" java.util.NoSuchElementException: id: SINK > at > org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40) > at > org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387) > at > org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339) > at > org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:139) > at > org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:120) > at > org.apache.kafka.streams.KafkaStreaming.(KafkaStreaming.java:110) > at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35) > The TopologyBuilder is being built like so: > topologyBuilder.addSource("SOURCE", new StringDeserializer(), new > StringDeserializer(), "src-topic") > .addProcessor("PROCESS", new > GenericProcessorClient(replaceVowels), "SOURCE") > .addSink("SINK", "dest-topic", new StringSerializer(), new > StringSerializer(), "PROCESS"); > Looks to me the cause of the error is that in TopologyBuilder.addSink method > the sink is never connected with it's parent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3842) Add Helper Functions Into TestUtils
[ https://issues.apache.org/jira/browse/KAFKA-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-3842: --- Description: Per guidance from [~guozhang] from PR #1477 move helper functions from RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, getStreamsConfig into TestUtils and parameterize as appropriate. Also look into adding a `waitUntil(Condition condition)` type construct to wait for a condition to be met without relying on using Thread.sleep (was: Per guidance from [~guozhang] from PR #1477 move helper functions from RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, getStreamsConfig into TestUtils and parameterize as appropriate.) > Add Helper Functions Into TestUtils > --- > > Key: KAFKA-3842 > URL: https://issues.apache.org/jira/browse/KAFKA-3842 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.0.1 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.10.0.1 > > > Per guidance from [~guozhang] from PR #1477 move helper functions from > RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, > getStreamsConfig into TestUtils and parameterize as appropriate. Also look > into adding a `waitUntil(Condition condition)` type construct to wait for a > condition to be met without relying on using Thread.sleep -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3842) Add Helper Functions Into TestUtils
[ https://issues.apache.org/jira/browse/KAFKA-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-3842: --- Description: Per guidance from [~guozhang] from PR #1477 move helper functions from RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, getStreamsConfig into TestUtils and parameterize as appropriate. Also look into adding a {{waitUntil(Condition condition)}} type construct to wait for a condition to be met without relying on using Thread.sleep (was: Per guidance from [~guozhang] from PR #1477 move helper functions from RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, getStreamsConfig into TestUtils and parameterize as appropriate. Also look into adding a `waitUntil(Condition condition)` type construct to wait for a condition to be met without relying on using Thread.sleep) > Add Helper Functions Into TestUtils > --- > > Key: KAFKA-3842 > URL: https://issues.apache.org/jira/browse/KAFKA-3842 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.0.1 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.10.0.1 > > > Per guidance from [~guozhang] from PR #1477 move helper functions from > RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, > getStreamsConfig into TestUtils and parameterize as appropriate. Also look > into adding a {{waitUntil(Condition condition)}} type construct to wait for a > condition to be met without relying on using Thread.sleep -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3842) Add Helper Functions Into TestUtils
Bill Bejeck created KAFKA-3842: -- Summary: Add Helper Functions Into TestUtils Key: KAFKA-3842 URL: https://issues.apache.org/jira/browse/KAFKA-3842 Project: Kafka Issue Type: Improvement Affects Versions: 0.10.0.1 Reporter: Bill Bejeck Assignee: Bill Bejeck Fix For: 0.10.0.1 Per guidance from [~guozhang] from PR #1477 move helper functions from RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, getStreamsConfig into TestUtils and parameterize as appropriate. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3794) Add Stream / Table prefix in print functions
[ https://issues.apache.org/jira/browse/KAFKA-3794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3794: -- Assignee: Bill Bejeck > Add Stream / Table prefix in print functions > > > Key: KAFKA-3794 > URL: https://issues.apache.org/jira/browse/KAFKA-3794 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie, user-experience > > Currently the KTable/KStream.print() operator will print the key-value pair > as it was forwarded to this operator. However, if there are multiple > operators in the topologies with the same {{PrintStream}} (e.g. stdout), > their printed key-value pairs will be interleaving on that stream channel. > Hence it is better to add a prefix for different KStream/KTable.print > operators. One proposal: > 1) For KTable, it inherits a table name when created, and we can use that > name as the prefix as {{[table-name]: key, value}}. > 2) For KStream, we can overload the function with an additional "name" > parameter that we use as the prefix; if it is not specified, then we can use > the parent processor node name, which has the pattern like > {{KSTREAM-JOIN-suffix_index}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3794) Add Stream / Table prefix in print functions
[ https://issues.apache.org/jira/browse/KAFKA-3794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15317475#comment-15317475 ] Bill Bejeck commented on KAFKA-3794: picking this one up, mea culpa > Add Stream / Table prefix in print functions > > > Key: KAFKA-3794 > URL: https://issues.apache.org/jira/browse/KAFKA-3794 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie, user-experience > > Currently the KTable/KStream.print() operator will print the key-value pair > as it was forwarded to this operator. However, if there are multiple > operators in the topologies with the same {{PrintStream}} (e.g. stdout), > their printed key-value pairs will be interleaving on that stream channel. > Hence it is better to add a prefix for different KStream/KTable.print > operators. One proposal: > 1) For KTable, it inherits a table name when created, and we can use that > name as the prefix as {{[table-name]: key, value}}. > 2) For KStream, we can overload the function with an additional "name" > parameter that we use as the prefix; if it is not specified, then we can use > the parent processor node name, which has the pattern like > {{KSTREAM-JOIN-suffix_index}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3842) Add Helper Functions Into TestUtils
[ https://issues.apache.org/jira/browse/KAFKA-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3842 started by Bill Bejeck. -- > Add Helper Functions Into TestUtils > --- > > Key: KAFKA-3842 > URL: https://issues.apache.org/jira/browse/KAFKA-3842 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.0.1 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.10.0.1 > > > Per guidance from [~guozhang] from PR #1477 move helper functions from > RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, > getStreamsConfig into TestUtils and parameterize as appropriate. Also look > into adding a {{waitUntil(Condition condition)}} type construct to wait for a > condition to be met without relying on using Thread.sleep -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3842) Add Helper Functions Into TestUtils
[ https://issues.apache.org/jira/browse/KAFKA-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-3842: --- Fix Version/s: (was: 0.10.0.1) 0.10.1.0 Affects Version/s: (was: 0.10.0.1) 0.10.1.0 Status: Patch Available (was: In Progress) > Add Helper Functions Into TestUtils > --- > > Key: KAFKA-3842 > URL: https://issues.apache.org/jira/browse/KAFKA-3842 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.1.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > > Per guidance from [~guozhang] from PR #1477 move helper functions from > RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, > getStreamsConfig into TestUtils and parameterize as appropriate. Also look > into adding a {{waitUntil(Condition condition)}} type construct to wait for a > condition to be met without relying on using Thread.sleep -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3794) Add Stream / Table prefix in print functions
[ https://issues.apache.org/jira/browse/KAFKA-3794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3794 started by Bill Bejeck. -- > Add Stream / Table prefix in print functions > > > Key: KAFKA-3794 > URL: https://issues.apache.org/jira/browse/KAFKA-3794 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie, user-experience > > Currently the KTable/KStream.print() operator will print the key-value pair > as it was forwarded to this operator. However, if there are multiple > operators in the topologies with the same {{PrintStream}} (e.g. stdout), > their printed key-value pairs will be interleaving on that stream channel. > Hence it is better to add a prefix for different KStream/KTable.print > operators. One proposal: > 1) For KTable, it inherits a table name when created, and we can use that > name as the prefix as {{[table-name]: key, value}}. > 2) For KStream, we can overload the function with an additional "name" > parameter that we use as the prefix; if it is not specified, then we can use > the parent processor node name, which has the pattern like > {{KSTREAM-JOIN-suffix_index}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3903) Convert tests to use static helper methods for Consumer/Producer/StreamsConfigs setup
Bill Bejeck created KAFKA-3903: -- Summary: Convert tests to use static helper methods for Consumer/Producer/StreamsConfigs setup Key: KAFKA-3903 URL: https://issues.apache.org/jira/browse/KAFKA-3903 Project: Kafka Issue Type: Improvement Components: streams Reporter: Bill Bejeck Assignee: Bill Bejeck Fix For: 0.10.1.0 There are several unit/integration tests where we create Consumer/Producer/Streams configs. All of these calls essentially create the same configs over and over. We should migrate these config setups to use the static helper methods TestUtils.consumerConfigs, TestUtils.producerConfigs, StreamsTestUtils.getStreamsConfigs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3338) Add print and writeAsText functions to the Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-3338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15215160#comment-15215160 ] Bill Bejeck commented on KAFKA-3338: Picking this one up. I Searched pull requests and looks like no one else is currently working this, but let me know otherwise if that's not the case. > Add print and writeAsText functions to the Streams DSL > -- > > Key: KAFKA-3338 > URL: https://issues.apache.org/jira/browse/KAFKA-3338 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie++ > Fix For: 0.10.1.0 > > > We want to provide some REPL-like pattern for users for better debuggability. > More concretely, we want to allow users to easily inspect their intermediate > data streams in the topology while running the application. Theoretically > this can be done by using a break point, or by calling System.out.println() > inside the operator, or through a finer grained trace-level logging. But more > user-friendly API would be to add a print() function to the KStream / KTable > object like: > {code} > // Prints the elements in this stream to the stdout, i.e. "System.out" of the > JVM > KStream#print(/* optional serde */); > KTable#print(/* optional serde */); > // Writes the stream as text file(s) to the specified location. > KStream#writeAsText(String filePath, /* optional serde */); > KTable#writeAsText(String filePath, /* optional serde */); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3338) Add print and writeAsText functions to the Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-3338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3338: -- Assignee: Bill Bejeck > Add print and writeAsText functions to the Streams DSL > -- > > Key: KAFKA-3338 > URL: https://issues.apache.org/jira/browse/KAFKA-3338 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie++ > Fix For: 0.10.1.0 > > > We want to provide some REPL-like pattern for users for better debuggability. > More concretely, we want to allow users to easily inspect their intermediate > data streams in the topology while running the application. Theoretically > this can be done by using a break point, or by calling System.out.println() > inside the operator, or through a finer grained trace-level logging. But more > user-friendly API would be to add a print() function to the KStream / KTable > object like: > {code} > // Prints the elements in this stream to the stdout, i.e. "System.out" of the > JVM > KStream#print(/* optional serde */); > KTable#print(/* optional serde */); > // Writes the stream as text file(s) to the specified location. > KStream#writeAsText(String filePath, /* optional serde */); > KTable#writeAsText(String filePath, /* optional serde */); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3430) Allow users to set key in KTable.toStream() and KStream
[ https://issues.apache.org/jira/browse/KAFKA-3430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3430 started by Bill Bejeck. -- > Allow users to set key in KTable.toStream() and KStream > --- > > Key: KAFKA-3430 > URL: https://issues.apache.org/jira/browse/KAFKA-3430 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: api > Fix For: 0.10.0.0 > > > Currently KTable.toStream does not take any parameters and hence users who > wants to set the key need to do two steps: > {code}table.toStream().map(...){code} in order to do so. We can make it in > one step by providing the mapper parameter in toStream. > And similarly today users usually need to call {code} KStream.map() {code} in > order to select the key before aggregation-by-key operation if the original > stream is does not contain keys. > We can consider adding a specific function in KStream to do so: > {code}KStream.selectKey(mapper){code} > which essential is the same as > {code}KStream.map(/* mapper that does not change the value, but only the key > */){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3430) Allow users to set key in KTable.toStream() and KStream
[ https://issues.apache.org/jira/browse/KAFKA-3430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3430: -- Assignee: Bill Bejeck > Allow users to set key in KTable.toStream() and KStream > --- > > Key: KAFKA-3430 > URL: https://issues.apache.org/jira/browse/KAFKA-3430 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: api > Fix For: 0.10.0.0 > > > Currently KTable.toStream does not take any parameters and hence users who > wants to set the key need to do two steps: > {code}table.toStream().map(...){code} in order to do so. We can make it in > one step by providing the mapper parameter in toStream. > And similarly today users usually need to call {code} KStream.map() {code} in > order to select the key before aggregation-by-key operation if the original > stream is does not contain keys. > We can consider adding a specific function in KStream to do so: > {code}KStream.selectKey(mapper){code} > which essential is the same as > {code}KStream.map(/* mapper that does not change the value, but only the key > */){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3338) Add print and writeAsText functions to the Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-3338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15223099#comment-15223099 ] Bill Bejeck commented on KAFKA-3338: Guozhang, Thanks for the response, that all makes perfect sense. I still have some questions, but I think it's best at this point if I push a PR (WIP label) with what I have so far and then you (and others) can review and we can pick up the discussion from there. Thanks! > Add print and writeAsText functions to the Streams DSL > -- > > Key: KAFKA-3338 > URL: https://issues.apache.org/jira/browse/KAFKA-3338 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie++ > Fix For: 0.10.1.0 > > > We want to provide some REPL-like pattern for users for better debuggability. > More concretely, we want to allow users to easily inspect their intermediate > data streams in the topology while running the application. Theoretically > this can be done by using a break point, or by calling System.out.println() > inside the operator, or through a finer grained trace-level logging. But more > user-friendly API would be to add a print() function to the KStream / KTable > object like: > {code} > // Prints the elements in this stream to the stdout, i.e. "System.out" of the > JVM > KStream#print(/* optional serde */); > KTable#print(/* optional serde */); > // Writes the stream as text file(s) to the specified location. > KStream#writeAsText(String filePath, /* optional serde */); > KTable#writeAsText(String filePath, /* optional serde */); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3338) Add print and writeAsText functions to the Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-3338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220957#comment-15220957 ] Bill Bejeck commented on KAFKA-3338: I'm wondering if we need to provide a Serde for the print/writeAsText functionality as Serdes are only required when reading from/writing to a topic or when using a state store. To give you an idea of where I'm coming from, here's the integration test for the implementation of print() {code} KStreamtransactionKStream = kStreamBuilder.stream(stringSerde,transactionSerde,"stocks"); transactionKStream.through(stringSerde, transactionSerde,"stocks-out").print() .map((k,v)-> new KeyValue<>(v.getSymbol(),v)).print() .aggregateByKey(StockTransactionCollector::new, (k, v, stockTransactionCollector) -> stockTransactionCollector.add(v), TumblingWindows.of("stock-summaries").with(1), stringSerde,collectorSerde).print(); .to(windowedSerde,collectorSerde,"transaction-summary"); {code} This allows me to see all elements from each stream (in order) on stdout. Thoughts? > Add print and writeAsText functions to the Streams DSL > -- > > Key: KAFKA-3338 > URL: https://issues.apache.org/jira/browse/KAFKA-3338 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie++ > Fix For: 0.10.1.0 > > > We want to provide some REPL-like pattern for users for better debuggability. > More concretely, we want to allow users to easily inspect their intermediate > data streams in the topology while running the application. Theoretically > this can be done by using a break point, or by calling System.out.println() > inside the operator, or through a finer grained trace-level logging. But more > user-friendly API would be to add a print() function to the KStream / KTable > object like: > {code} > // Prints the elements in this stream to the stdout, i.e. "System.out" of the > JVM > KStream#print(/* optional serde */); > KTable#print(/* optional serde */); > // Writes the stream as text file(s) to the specified location. > KStream#writeAsText(String filePath, /* optional serde */); > KTable#writeAsText(String filePath, /* optional serde */); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3338) Add print and writeAsText functions to the Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-3338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3338 started by Bill Bejeck. -- > Add print and writeAsText functions to the Streams DSL > -- > > Key: KAFKA-3338 > URL: https://issues.apache.org/jira/browse/KAFKA-3338 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie++ > Fix For: 0.10.1.0 > > > We want to provide some REPL-like pattern for users for better debuggability. > More concretely, we want to allow users to easily inspect their intermediate > data streams in the topology while running the application. Theoretically > this can be done by using a break point, or by calling System.out.println() > inside the operator, or through a finer grained trace-level logging. But more > user-friendly API would be to add a print() function to the KStream / KTable > object like: > {code} > // Prints the elements in this stream to the stdout, i.e. "System.out" of the > JVM > KStream#print(/* optional serde */); > KTable#print(/* optional serde */); > // Writes the stream as text file(s) to the specified location. > KStream#writeAsText(String filePath, /* optional serde */); > KTable#writeAsText(String filePath, /* optional serde */); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work stopped] (KAFKA-3443) Support regex topics in addSource() and stream()
[ https://issues.apache.org/jira/browse/KAFKA-3443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3443 stopped by Bill Bejeck. -- > Support regex topics in addSource() and stream() > > > Key: KAFKA-3443 > URL: https://issues.apache.org/jira/browse/KAFKA-3443 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: api > Fix For: 0.10.1.0 > > > Currently Kafka Streams only support specific topics in creating source > streams, while we can leverage consumer's regex subscription to allow regex > topics as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3443) Support regex topics in addSource() and stream()
[ https://issues.apache.org/jira/browse/KAFKA-3443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-3443: --- Status: Patch Available (was: Open) Initial pass completed, ready for review > Support regex topics in addSource() and stream() > > > Key: KAFKA-3443 > URL: https://issues.apache.org/jira/browse/KAFKA-3443 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: api > Fix For: 0.10.1.0 > > > Currently Kafka Streams only support specific topics in creating source > streams, while we can leverage consumer's regex subscription to allow regex > topics as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3443) Support regex topics in addSource() and stream()
[ https://issues.apache.org/jira/browse/KAFKA-3443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15287764#comment-15287764 ] Bill Bejeck commented on KAFKA-3443: Status update - still actively working on this and I'm really close to issuing a PR in the next couple of days. > Support regex topics in addSource() and stream() > > > Key: KAFKA-3443 > URL: https://issues.apache.org/jira/browse/KAFKA-3443 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: api > Fix For: 0.10.1.0 > > > Currently Kafka Streams only support specific topics in creating source > streams, while we can leverage consumer's regex subscription to allow regex > topics as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work stopped] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3101 stopped by Bill Bejeck. -- > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3101: -- Assignee: (was: Bill Bejeck) > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3443) Support regex topics in addSource() and stream()
[ https://issues.apache.org/jira/browse/KAFKA-3443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3443: -- Assignee: Bill Bejeck > Support regex topics in addSource() and stream() > > > Key: KAFKA-3443 > URL: https://issues.apache.org/jira/browse/KAFKA-3443 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: api > Fix For: 0.10.1.0 > > > Currently Kafka Streams only support specific topics in creating source > streams, while we can leverage consumer's regex subscription to allow regex > topics as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3101 started by Bill Bejeck. -- > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3101: -- Assignee: Bill Bejeck > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3101: -- Assignee: (was: Bill Bejeck) > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work stopped] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3101 stopped by Bill Bejeck. -- > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3101: -- Assignee: Bill Bejeck > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3101 started by Bill Bejeck. -- > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work stopped] (KAFKA-3430) Allow users to set key in KTable.toStream() and KStream
[ https://issues.apache.org/jira/browse/KAFKA-3430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3430 stopped by Bill Bejeck. -- > Allow users to set key in KTable.toStream() and KStream > --- > > Key: KAFKA-3430 > URL: https://issues.apache.org/jira/browse/KAFKA-3430 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: api > Fix For: 0.10.0.0 > > > Currently KTable.toStream does not take any parameters and hence users who > wants to set the key need to do two steps: > {code}table.toStream().map(...){code} in order to do so. We can make it in > one step by providing the mapper parameter in toStream. > And similarly today users usually need to call {code} KStream.map() {code} in > order to select the key before aggregation-by-key operation if the original > stream is does not contain keys. > We can consider adding a specific function in KStream to do so: > {code}KStream.selectKey(mapper){code} > which essential is the same as > {code}KStream.map(/* mapper that does not change the value, but only the key > */){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3430) Allow users to set key in KTable.toStream() and KStream
[ https://issues.apache.org/jira/browse/KAFKA-3430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-3430: --- Status: Patch Available (was: Open) Submitted PR for KAFKA-3430 Allow users to set key in KTable.toStream and in KStream. With KStream the method selectKey was added to enable getting a key from values before perfoming aggregation-by-key operations on original streams that have null keys. > Allow users to set key in KTable.toStream() and KStream > --- > > Key: KAFKA-3430 > URL: https://issues.apache.org/jira/browse/KAFKA-3430 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: api > Fix For: 0.10.0.0 > > > Currently KTable.toStream does not take any parameters and hence users who > wants to set the key need to do two steps: > {code}table.toStream().map(...){code} in order to do so. We can make it in > one step by providing the mapper parameter in toStream. > And similarly today users usually need to call {code} KStream.map() {code} in > order to select the key before aggregation-by-key operation if the original > stream is does not contain keys. > We can consider adding a specific function in KStream to do so: > {code}KStream.selectKey(mapper){code} > which essential is the same as > {code}KStream.map(/* mapper that does not change the value, but only the key > */){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15393079#comment-15393079 ] Bill Bejeck commented on KAFKA-3973: [~enothereska] Will do, just need to finish up a few details. > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > Attachments: CachingPerformanceBenchmarks.java, MemoryLRUCache.java > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3989) Add JMH module for Benchmarks
[ https://issues.apache.org/jira/browse/KAFKA-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-3989: --- Description: JMH is a Java harness for building, running, and analyzing benchmarks written in Java or JVM languages. To run properly JMH needs to be in it's own module. This task will also investigate using the jmh -gradle pluging [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH from gradle. This is related to [https://issues.apache.org/jira/browse/KAFKA-3973] (was: JMH is a Java harness for building, running, and analyzing benchmarks written in Java or JVM languages. To run properly JMH needs to be in it's own module. This task will also investigate using the jmh -gradle pluging [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH from gradle) > Add JMH module for Benchmarks > - > > Key: KAFKA-3989 > URL: https://issues.apache.org/jira/browse/KAFKA-3989 > Project: Kafka > Issue Type: Improvement >Reporter: Bill Bejeck >Assignee: Bill Bejeck > > JMH is a Java harness for building, running, and analyzing benchmarks written > in Java or JVM languages. To run properly JMH needs to be in it's own > module. This task will also investigate using the jmh -gradle pluging > [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH > from gradle. This is related to > [https://issues.apache.org/jira/browse/KAFKA-3973] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3989) Add JMH module for Benchmarks
Bill Bejeck created KAFKA-3989: -- Summary: Add JMH module for Benchmarks Key: KAFKA-3989 URL: https://issues.apache.org/jira/browse/KAFKA-3989 Project: Kafka Issue Type: Improvement Reporter: Bill Bejeck Assignee: Bill Bejeck JMH is a Java harness for building, running, and analyzing benchmarks written in Java or JVM languages. To run properly JMH needs to be in it's own module. This task will also investigate using the jmh -gradle pluging [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH from gradle -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-3973: --- Status: Patch Available (was: In Progress) > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > Attachments: CachingPerformanceBenchmarks.java, MemoryLRUCache.java > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394761#comment-15394761 ] Bill Bejeck commented on KAFKA-3973: [~ijuma] I re-ran the tests with no instrumentation using the FALLBACK_UNSAFE enum, the results were the same if not slower. The benchmark can be run now with no instrumentation. > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > Attachments: CachingPerformanceBenchmarks.java, MemoryLRUCache.java > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15391766#comment-15391766 ] Bill Bejeck commented on KAFKA-3973: Yes I ran the tests using instrumentation (-javaagent:), sorry I forgot to put that in my original comments. > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > Attachments: CachingPerformanceBenchmarks.java, MemoryLRUCache.java > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3973 started by Bill Bejeck. -- > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15391229#comment-15391229 ] Bill Bejeck commented on KAFKA-3973: The results for the LRU investigation are below. There were three types of measurements taken: 1. The current tracking max by size cache (Control) 2. The cache tracking size by max memory (Object). Both keys and values were used in keeping track of total memory. The max size for the cache was calculated by multiplying the memory of a key/value pair (taken using the MemoryMeter class from the jamm library https://github.com/jbellis/jamm) by the max size specified in the Control/Bytes cache. 3. Storing bytes in the cache (Bytes). The max size of the cache in this case was done by size. Both keys and values are serialized/deserialized. 4. I have attached the benchmarking class and the modified MemoryLRUCache class for reference. While complete accuracy in java benchmarking can be difficult to achieve, the results of these benchmarks are sufficient from the perspective of how the differnt approaches compare to each other. The cache was set to a max size of 500,000 (or in the memory based cache 500,000 * key/value memory size). Two rounds of 25 iterations each were run. In the first round 500,000 put/get combinations were performed to measure behaviour when all records could fit in the cache. The second round had 1,000,000 put/get combinations to measure performance with evictions. There were also some benchmarks for raw serialization and memory tracking included as well. As exepected Control group had the best performance. The Object (memory tracking) was better than serialization only if the MemoryMeter.measure method was used. However the MemoryMeter.measure only captures the amount of memory taken by the object itself, it does not take into account any other objects in the object graph. For example here is debug statement showing the memory for the string "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec a porttitor felis. In vel dolor." MemoryMeter.measure : 24 MemoryMeter.measureDeep : root [java.lang.String] 232 bytes (24 bytes) | +--value [char[]] 208 bytes (208 bytes) 232 MemoryMeter.measure total ignores the char array hanging off String objects. With this in mind we would be forced to use MemoryMeter.measureDeep to get an accurate meausure of objects being placed in the cache. From the results below the MemoryMeter.measureDeep method had the slowest performance. With these results in mind, it looks to me like storing bytes in the cache is best going forward. Final notes 1. Another tool Java Object Layout (http://openjdk.java.net/projects/code-tools/jol/) shows promise, but needs evaluation. 2. These benchmarks should be re-written with JMH (http://openjdk.java.net/projects/code-tools/jmh/). But using JMH requires a separate module at a minimum, but the JMH Gradle pluging (https://github.com/melix/jmh-gradle-plugin) looks interesting as it gives the ability to integrate JMH benchmarking tests into an existing project. Having a place to write/run JMH benhmarks could be beneficial to the project as a whole. If this seems worthwhile, I will create a Jira ticket and look into adding the JMH plugin, or creating a separate benchmarking module. 3. Probably should add a benchmarking test utilizing the MemoryLRUCache as well. Investigation Results Tests for 500,000 inserts 500K count/500K * memory max cache size Control 500K cache put/get results 25 iterations ave time (millis) 53.24 Object500K cache put/get results 25 iterations ave time (millis) 250.88 Object(Deep) 500K cache put/get results 25 iterations ave time (millis) 1720.08 Bytes 500K cache put/get results 25 iterations ave time (millis) 288.92 Tests for 1,000,000 inserts 500K count/500K * memory max cache size Control 1M cache put/get results 25 iterations ave time (millis) 227.48 Object1M cache put/get results 25 iterations ave time (millis) 488.2 Object(Deep) 1M cache put/get results 25 iterations ave time (millis) 2575.04 Bytes 1M cache put/get results 25 iterations ave time (millis) 852.04 Raw timing of tracking memory (deep) for 500K Strings Took [567] millis to track memory Raw timing of tracking memory for 500K Strings Took [92] millis to track memory Raw timing of tracking memory (deep) for 500K ComplexObjects Took [2813] millis to track memory Raw timing of tracking memory for 500K ComplexObjects Took [148] millis to track memory Raw timing of serialization for 500K Strings Took [133] millis to serialize Raw timing of serialization for 500K ComplexObjects Took [525] millis to serialize > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL:
[jira] [Updated] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-3973: --- Attachment: MemoryLRUCache.java CachingPerformanceBenchmarks.java Benchmark test and modified MemoryLRUCache for reference. > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > Attachments: CachingPerformanceBenchmarks.java, MemoryLRUCache.java > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3989) Add JMH module for Benchmarks
[ https://issues.apache.org/jira/browse/KAFKA-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3989 started by Bill Bejeck. -- > Add JMH module for Benchmarks > - > > Key: KAFKA-3989 > URL: https://issues.apache.org/jira/browse/KAFKA-3989 > Project: Kafka > Issue Type: Improvement >Reporter: Bill Bejeck >Assignee: Bill Bejeck > > JMH is a Java harness for building, running, and analyzing benchmarks written > in Java or JVM languages. To run properly JMH needs to be in it's own > module. This task will also investigate using the jmh -gradle pluging > [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH > from gradle. This is related to > [https://issues.apache.org/jira/browse/KAFKA-3973] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3989) Add JMH module for Benchmarks
[ https://issues.apache.org/jira/browse/KAFKA-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15407054#comment-15407054 ] Bill Bejeck edited comment on KAFKA-3989 at 8/4/16 3:03 AM: I opted to try the gradle shadow plugin first (https://github.com/johnrengelman/shadow/) I've been able to get JMH working as a sub-module but needed to make the following changes to the build.gradle script: dependencies { . // Added this entry for gradle shadow plugin classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.3' // } Then added this to apply plugin the jmh-benchmarks module: project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' } Is this acceptable ? was (Author: bbejeck): I opted to try the gradle shadow plugin first (https://github.com/johnrengelman/shadow/) I've been able to get JMH working as a sub-module but needed to make the following changes to the build.gradle script: `dependencies { . // Added this entry for gradle shadow plugin classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.3' // } ` Then added this to apply plugin the jmh-benchmarks module: project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' } Is this acceptable ? > Add JMH module for Benchmarks > - > > Key: KAFKA-3989 > URL: https://issues.apache.org/jira/browse/KAFKA-3989 > Project: Kafka > Issue Type: Improvement >Reporter: Bill Bejeck >Assignee: Bill Bejeck > > JMH is a Java harness for building, running, and analyzing benchmarks written > in Java or JVM languages. To run properly JMH needs to be in it's own > module. This task will also investigate using the jmh -gradle pluging > [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH > from gradle. This is related to > [https://issues.apache.org/jira/browse/KAFKA-3973] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3989) Add JMH module for Benchmarks
[ https://issues.apache.org/jira/browse/KAFKA-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-3989: --- Fix Version/s: 0.10.1.0 > Add JMH module for Benchmarks > - > > Key: KAFKA-3989 > URL: https://issues.apache.org/jira/browse/KAFKA-3989 > Project: Kafka > Issue Type: Improvement >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > > JMH is a Java harness for building, running, and analyzing benchmarks written > in Java or JVM languages. To run properly JMH needs to be in it's own > module. This task will also investigate using the jmh -gradle pluging > [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH > from gradle. This is related to > [https://issues.apache.org/jira/browse/KAFKA-3973] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15420061#comment-15420061 ] Bill Bejeck commented on KAFKA-3973: Attaching JMH benchmark results, removing the previous hand-rolled benchmarking code. The JMH benchmark code not attached as it is part of PR #1712 > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > Attachments: MemBytesBenchmark.txt > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15374085#comment-15374085 ] Bill Bejeck commented on KAFKA-3101: [~enothereska] [~guozhang] With regards to the performance comparison here is my plan: Create a simple streams process with KTableAggregate utilizing Objects(records) first then bytes. Track the memory usage via the jamm library referenced above. Track message throughput for both types (records vs bytes). Profile how much CPU time is spent in the serialization/deserialization process. Is this reasonable? Any additional thoughts or comments? > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3926) Transient failures in org.apache.kafka.streams.integration.RegexSourceIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-3926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3926 started by Bill Bejeck. -- > Transient failures in > org.apache.kafka.streams.integration.RegexSourceIntegrationTest > - > > Key: KAFKA-3926 > URL: https://issues.apache.org/jira/browse/KAFKA-3926 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Bill Bejeck > > {code} > org.apache.kafka.streams.integration.RegexSourceIntegrationTest > > testRegexMatchesTopicsAWhenDeleted FAILED > java.lang.AssertionError: > Expected: <[TEST-TOPIC-A, TEST-TOPIC-B]> > but: was <[TEST-TOPIC-A]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.junit.Assert.assertThat(Assert.java:956) > at org.junit.Assert.assertThat(Assert.java:923) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:211) > {code} > I think it is due to the fact that some times the rebalance takes much longer > than the specified 60 seconds. > One example: https://builds.apache.org/job/kafka-trunk-jdk8/730/consoleFull -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3926) Transient failures in org.apache.kafka.streams.integration.RegexSourceIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-3926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15363724#comment-15363724 ] Bill Bejeck commented on KAFKA-3926: Looking at this a bit more. The test in question is failing on the first assertion to confirm that both created topics are matched by the regex as source topics. There is implicit assumption that all topics will be included in the the call to create the stream tasks after the rebalance. But there could actually be two calls, one with TEST-TOPIC-A, then another with both topics. To fix the transient failures in this case I think the waitForCondition call should be checking for all expected assigned topics vs simply storing results of each call. > Transient failures in > org.apache.kafka.streams.integration.RegexSourceIntegrationTest > - > > Key: KAFKA-3926 > URL: https://issues.apache.org/jira/browse/KAFKA-3926 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Bill Bejeck > > {code} > org.apache.kafka.streams.integration.RegexSourceIntegrationTest > > testRegexMatchesTopicsAWhenDeleted FAILED > java.lang.AssertionError: > Expected: <[TEST-TOPIC-A, TEST-TOPIC-B]> > but: was <[TEST-TOPIC-A]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.junit.Assert.assertThat(Assert.java:956) > at org.junit.Assert.assertThat(Assert.java:923) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:211) > {code} > I think it is due to the fact that some times the rebalance takes much longer > than the specified 60 seconds. > One example: https://builds.apache.org/job/kafka-trunk-jdk8/730/consoleFull -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366148#comment-15366148 ] Bill Bejeck commented on KAFKA-3101: [~enothereska] ok I get it now, KAFKA-3101 is being replaced/superseded by KAFKA-3776. Thanks for the heads up. > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366591#comment-15366591 ] Bill Bejeck commented on KAFKA-3101: [~guozhang] ok will do. > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367923#comment-15367923 ] Bill Bejeck commented on KAFKA-3101: [~guozhang] [~enothereska] Would adding flatbuffers (https://google.github.io/flatbuffers/) be beyond the scope of this performance comparison? > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15364919#comment-15364919 ] Bill Bejeck commented on KAFKA-3101: Is this available now? If so I'd like to pick this up if possible. > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369257#comment-15369257 ] Bill Bejeck commented on KAFKA-3101: [~enothereska] none actually, a misunderstanding on my part. > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369257#comment-15369257 ] Bill Bejeck edited comment on KAFKA-3101 at 7/9/16 7:26 PM: [~enothereska] none actually, a misunderstanding on my part. Thanks, -Bill was (Author: bbejeck): [~enothereska] none actually, a misunderstanding on my part. > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3989) Add JMH module for Benchmarks
[ https://issues.apache.org/jira/browse/KAFKA-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15408276#comment-15408276 ] Bill Bejeck commented on KAFKA-3989: Yes, the classpath entry is in the 'buildscript section. To me, the advantage of the shadow plugin is simplicity, as key to working with JMH is just building the jar file to run the benchmarks. Seemed to me like some of the features in jmh-gradle aren't really needed. I am adding a task so you can run a single benchmark from the command line (an approach I'm borrowing from the JMH examples), as that's how I envision most people using the module. Maybe that's an incorrect assumption on my part. Having said that, I'm not commited to a particular solution so I can put in jmh-gradle if desired, > Add JMH module for Benchmarks > - > > Key: KAFKA-3989 > URL: https://issues.apache.org/jira/browse/KAFKA-3989 > Project: Kafka > Issue Type: Improvement >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > > JMH is a Java harness for building, running, and analyzing benchmarks written > in Java or JVM languages. To run properly JMH needs to be in it's own > module. This task will also investigate using the jmh -gradle pluging > [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH > from gradle. This is related to > [https://issues.apache.org/jira/browse/KAFKA-3973] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411821#comment-15411821 ] Bill Bejeck edited comment on KAFKA-3973 at 8/8/16 3:34 PM: I used JMH to benchmark the performance of caching bytes vs object (tracking by memory size using jamm) here are the results: EDIT: New results from updated test # Run complete. Total time: 00:02:41 Benchmark Mode CntScoreError Units MemoryBytesCacheBenchmark.testCacheByMemory thrpt 40 536694.504 ± 4177.019 ops/s MemoryBytesCacheBenchmark.testCacheBySizeBytes thrpt 40 4713360.286 ± 60874.723 ops/s Using JMH it still appears that serialization has the advantage. The test used for benchmarking will be included in the PR for KAFKA-3989 (coming soon). was (Author: bbejeck): I used JMH to benchmark the performance of caching bytes vs object (tracking by memory size using jamm) here are the results: Result "testCacheBySizeBytes": 2157013.372 ±(99.9%) 198793.816 ops/s [Average] (min, avg, max) = (687952.309, 2157013.372, 2485954.624), stdev = 353355.834 CI (99.9%): [1958219.556, 2355807.189] (assumes normal distribution) # Run complete. Total time: 00:02:41 Benchmark Mode Cnt ScoreError Units MemoryBytesCacheBenchmark.testCacheByMemory thrpt 40290142.181 ± 3001.345 ops/s MemoryBytesCacheBenchmark.testCacheBySizeBytes thrpt 40 2157013.372 ± 198793.816 ops/s Using JMH it still appears that serialization has the advantage. The test used for benchmarking will be included in the PR for KAFKA-3989 (coming soon). > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > Attachments: CachingPerformanceBenchmarks.java, MemoryLRUCache.java > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411821#comment-15411821 ] Bill Bejeck edited comment on KAFKA-3973 at 8/8/16 4:55 PM: I used JMH to benchmark the performance of caching bytes vs object (tracking by memory size using jamm) here are the results: EDIT: Needed to refactor tests, and use Bytes to wrap byte array for keys in cache Run complete. Total time: 00:02:42 Benchmark Mode CntScoreErrorUnits MemoryBytesCacheBenchmark.testCacheByMemory thrpt 40251002.444 ± 20683.129 ops/s MemoryBytesCacheBenchmark.testCacheBySizeBytes thrpt 40 1477170.674 ± 12772.196 ops/s After refactoring the JMH test the gap between tracking by memory and serialization has closed some, but serialization still has the advantage. The test used for benchmarking will be included in the PR for KAFKA-3989 (coming soon). was (Author: bbejeck): I used JMH to benchmark the performance of caching bytes vs object (tracking by memory size using jamm) here are the results: EDIT: Needed to refactor tests, and use Bytes to wrap byte array for keys in cache Run complete. Total time: 00:02:42 Benchmark Mode CntScoreErrorUnits MemoryBytesCacheBenchmark.testCacheByMemory thrpt 40251002.444 ± 20683.129 ops/s MemoryBytesCacheBenchmark.testCacheBySizeBytes thrpt 40 1477170.674 ± 12772.196 ops/s After refactoring the JMH test the gap between tracking by memory and serialization has close, but it still appears that serialization has the advantage. The test used for benchmarking will be included in the PR for KAFKA-3989 (coming soon). > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > Attachments: CachingPerformanceBenchmarks.java, MemoryLRUCache.java > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411821#comment-15411821 ] Bill Bejeck edited comment on KAFKA-3973 at 8/8/16 4:45 PM: I used JMH to benchmark the performance of caching bytes vs object (tracking by memory size using jamm) here are the results: EDIT: Needed to refactor tests, and use Bytes to wrap byte array for keys in cache Run complete. Total time: 00:02:42 Benchmark Mode CntScoreErrorUnits MemoryBytesCacheBenchmark.testCacheByMemory thrpt 40251002.444 ± 20683.129 ops/s MemoryBytesCacheBenchmark.testCacheBySizeBytes thrpt 40 1477170.674 ± 12772.196 ops/s After refactoring the JMH test the gap between tracking by memory and serialization has close, but it still appears that serialization has the advantage. The test used for benchmarking will be included in the PR for KAFKA-3989 (coming soon). was (Author: bbejeck): I used JMH to benchmark the performance of caching bytes vs object (tracking by memory size using jamm) here are the results: EDIT: New results from updated test # Run complete. Total time: 00:02:41 Benchmark Mode CntScoreError Units MemoryBytesCacheBenchmark.testCacheByMemory thrpt 40 536694.504 ± 4177.019 ops/s MemoryBytesCacheBenchmark.testCacheBySizeBytes thrpt 40 4713360.286 ± 60874.723 ops/s Using JMH it still appears that serialization has the advantage. The test used for benchmarking will be included in the PR for KAFKA-3989 (coming soon). > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > Attachments: CachingPerformanceBenchmarks.java, MemoryLRUCache.java > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3989) Add JMH module for Benchmarks
[ https://issues.apache.org/jira/browse/KAFKA-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-3989: --- Status: Patch Available (was: In Progress) > Add JMH module for Benchmarks > - > > Key: KAFKA-3989 > URL: https://issues.apache.org/jira/browse/KAFKA-3989 > Project: Kafka > Issue Type: Improvement >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > > JMH is a Java harness for building, running, and analyzing benchmarks written > in Java or JVM languages. To run properly JMH needs to be in it's own > module. This task will also investigate using the jmh -gradle pluging > [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH > from gradle. This is related to > [https://issues.apache.org/jira/browse/KAFKA-3973] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3989) Add JMH module for Benchmarks
[ https://issues.apache.org/jira/browse/KAFKA-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15412231#comment-15412231 ] Bill Bejeck commented on KAFKA-3989: [~ijuma] After your comment, I tried to implement the jmh-gradle-plugin but I got this error: Error:Could not find me.champeau.gradle:jmh-gradle-plugin:0.3.0. Searched in the following locations: https://repo1.maven.org/maven2/me/champeau/gradle/jmh-gradle-plugin/0.3.0/jmh-gradle-plugin-0.3.0.pom https://repo1.maven.org/maven2/me/champeau/gradle/jmh-gradle-plugin/0.3.0/jmh-gradle-plugin-0.3.0.jar https://jcenter.bintray.com/me/champeau/gradle/jmh-gradle-plugin/0.3.0/jmh-gradle-plugin-0.3.0.pom https://jcenter.bintray.com/me/champeau/gradle/jmh-gradle-plugin/0.3.0/jmh-gradle-plugin-0.3.0.jar http://dl.bintray.com/content/netflixoss/external-gradle-plugins/me/champeau/gradle/jmh-gradle-plugin/0.3.0/jmh-gradle-plugin-0.3.0.pom http://dl.bintray.com/content/netflixoss/external-gradle-plugins/me/champeau/gradle/jmh-gradle-plugin/0.3.0/jmh-gradle-plugin-0.3.0.jar I'm sure this is could be a simple configuration error, but I didn't spend any time tracking it down. FWIW I have somewhat limited experience with gradle and I took it as an opportunity to learn a little more by continuing to use the gradle-shade plugin > Add JMH module for Benchmarks > - > > Key: KAFKA-3989 > URL: https://issues.apache.org/jira/browse/KAFKA-3989 > Project: Kafka > Issue Type: Improvement >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > > JMH is a Java harness for building, running, and analyzing benchmarks written > in Java or JVM languages. To run properly JMH needs to be in it's own > module. This task will also investigate using the jmh -gradle pluging > [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH > from gradle. This is related to > [https://issues.apache.org/jira/browse/KAFKA-3973] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3973) Investigate feasibility of caching bytes vs. records
[ https://issues.apache.org/jira/browse/KAFKA-3973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411821#comment-15411821 ] Bill Bejeck commented on KAFKA-3973: I used JMH to benchmark the performance of caching bytes vs object (tracking by memory size using jamm) here are the results: Result "testCacheBySizeBytes": 2157013.372 ±(99.9%) 198793.816 ops/s [Average] (min, avg, max) = (687952.309, 2157013.372, 2485954.624), stdev = 353355.834 CI (99.9%): [1958219.556, 2355807.189] (assumes normal distribution) # Run complete. Total time: 00:02:41 Benchmark Mode Cnt ScoreError Units MemoryBytesCacheBenchmark.testCacheByMemory thrpt 40290142.181 ± 3001.345 ops/s MemoryBytesCacheBenchmark.testCacheBySizeBytes thrpt 40 2157013.372 ± 198793.816 ops/s Using JMH it still appears that serialization has the advantage. The test used for benchmarking will be included in the PR for KAFKA-3989 (coming soon). > Investigate feasibility of caching bytes vs. records > > > Key: KAFKA-3973 > URL: https://issues.apache.org/jira/browse/KAFKA-3973 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Eno Thereska >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > Attachments: CachingPerformanceBenchmarks.java, MemoryLRUCache.java > > > Currently the cache stores and accounts for records, not bytes or objects. > This investigation would be around measuring any performance overheads that > come from storing bytes or objects. As an outcome we should know whether 1) > we should store bytes or 2) we should store objects. > If we store objects, the cache still needs to know their size (so that it can > know if the object fits in the allocated cache space, e.g., if the cache is > 100MB and the object is 10MB, we'd have space for 10 such objects). The > investigation needs to figure out how to find out the size of the object > efficiently in Java. > If we store bytes, then we are serialising an object into bytes before > caching it, i.e., we take a serialisation cost. The investigation needs > measure how bad this cost can be especially for the case when all objects fit > in cache (and thus any extra serialisation cost would show). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3794) Add Stream / Table prefix in print functions
[ https://issues.apache.org/jira/browse/KAFKA-3794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356319#comment-15356319 ] Bill Bejeck commented on KAFKA-3794: [~guozhang] I was thinking of adding the parent processor node name by default to the `writeAsText` functions for consistency, what do you think? > Add Stream / Table prefix in print functions > > > Key: KAFKA-3794 > URL: https://issues.apache.org/jira/browse/KAFKA-3794 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie, user-experience > > Currently the KTable/KStream.print() operator will print the key-value pair > as it was forwarded to this operator. However, if there are multiple > operators in the topologies with the same {{PrintStream}} (e.g. stdout), > their printed key-value pairs will be interleaving on that stream channel. > Hence it is better to add a prefix for different KStream/KTable.print > operators. One proposal: > 1) For KTable, it inherits a table name when created, and we can use that > name as the prefix as {{[table-name]: key, value}}. > 2) For KStream, we can overload the function with an additional "name" > parameter that we use as the prefix; if it is not specified, then we can use > the parent processor node name, which has the pattern like > {{KSTREAM-JOIN-suffix_index}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3926) Transient failures in org.apache.kafka.streams.integration.RegexSourceIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-3926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3926: -- Assignee: Bill Bejeck > Transient failures in > org.apache.kafka.streams.integration.RegexSourceIntegrationTest > - > > Key: KAFKA-3926 > URL: https://issues.apache.org/jira/browse/KAFKA-3926 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Bill Bejeck > > {code} > org.apache.kafka.streams.integration.RegexSourceIntegrationTest > > testRegexMatchesTopicsAWhenDeleted FAILED > java.lang.AssertionError: > Expected: <[TEST-TOPIC-A, TEST-TOPIC-B]> > but: was <[TEST-TOPIC-A]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.junit.Assert.assertThat(Assert.java:956) > at org.junit.Assert.assertThat(Assert.java:923) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:211) > {code} > I think it is due to the fact that some times the rebalance takes much longer > than the specified 60 seconds. > One example: https://builds.apache.org/job/kafka-trunk-jdk8/730/consoleFull -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3926) Transient failures in org.apache.kafka.streams.integration.RegexSourceIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-3926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15360768#comment-15360768 ] Bill Bejeck commented on KAFKA-3926: I'll pick this up. > Transient failures in > org.apache.kafka.streams.integration.RegexSourceIntegrationTest > - > > Key: KAFKA-3926 > URL: https://issues.apache.org/jira/browse/KAFKA-3926 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Bill Bejeck > > {code} > org.apache.kafka.streams.integration.RegexSourceIntegrationTest > > testRegexMatchesTopicsAWhenDeleted FAILED > java.lang.AssertionError: > Expected: <[TEST-TOPIC-A, TEST-TOPIC-B]> > but: was <[TEST-TOPIC-A]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.junit.Assert.assertThat(Assert.java:956) > at org.junit.Assert.assertThat(Assert.java:923) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:211) > {code} > I think it is due to the fact that some times the rebalance takes much longer > than the specified 60 seconds. > One example: https://builds.apache.org/job/kafka-trunk-jdk8/730/consoleFull -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4731) Add event-based session windows
[ https://issues.apache.org/jira/browse/KAFKA-4731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855204#comment-15855204 ] Bill Bejeck commented on KAFKA-4731: [~mjsax] Is this Jira pending any further API redesign discussions? If not I'd like to give this a shot. > Add event-based session windows > --- > > Key: KAFKA-4731 > URL: https://issues.apache.org/jira/browse/KAFKA-4731 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: needs-kip > > Kafka Streams allows to define session windows based on an time-based > inactivity gap. This can be used for _session detection_. > However, some data streams do contain event like "start session" and "end > session" to mark the begin and end of a session. For this use case, it is not > required to _detect_ session because session boundaries are known and not > based on time -- but on events. > This Jira is about adding support for those event-based session windows. To > add this feature, a KIP > (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) > is required to discuss the proposed design. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-4791: -- Assignee: Bill Bejeck > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880782#comment-15880782 ] Bill Bejeck commented on KAFKA-4791: picking this one up. > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881018#comment-15881018 ] Bill Bejeck commented on KAFKA-4791: Fair enough. I did not look into the issue at all, at first blush it seemed like a big that needed to be fixed asap. But considering your comments and the forthcoming changes with KIP-120, I'll hold off. > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881018#comment-15881018 ] Bill Bejeck edited comment on KAFKA-4791 at 2/23/17 6:56 PM: - Fair enough. I did not look into the issue at all, at first blush it seemed like a bug that needed to be fixed asap. But considering your comments and the forthcoming changes with KIP-120, I'll hold off. was (Author: bbejeck): Fair enough. I did not look into the issue at all, at first blush it seemed like a big that needed to be fixed asap. But considering your comments and the forthcoming changes with KIP-120, I'll hold off. > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-4652) Improve test coverage KStreamBuilder
[ https://issues.apache.org/jira/browse/KAFKA-4652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-4652: -- Assignee: Bill Bejeck > Improve test coverage KStreamBuilder > > > Key: KAFKA-4652 > URL: https://issues.apache.org/jira/browse/KAFKA-4652 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Bill Bejeck >Priority: Minor > Fix For: 0.10.3.0 > > > Some methods not covered,i.e, > {{publicKTable table(AutoOffsetReset offsetReset, String topic, > final String storeName)}} > {{public KStream stream(Serde keySerde, Serde valSerde, > Pattern topicPattern)}} > {{public KStream stream(AutoOffsetReset offsetReset, String... > topics)}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4652) Improve test coverage KStreamBuilder
[ https://issues.apache.org/jira/browse/KAFKA-4652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862883#comment-15862883 ] Bill Bejeck commented on KAFKA-4652: Misses these as well with auto-offset reset PR, picking up. > Improve test coverage KStreamBuilder > > > Key: KAFKA-4652 > URL: https://issues.apache.org/jira/browse/KAFKA-4652 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Bill Bejeck >Priority: Minor > Fix For: 0.10.3.0 > > > Some methods not covered,i.e, > {{publicKTable table(AutoOffsetReset offsetReset, String topic, > final String storeName)}} > {{public KStream stream(Serde keySerde, Serde valSerde, > Pattern topicPattern)}} > {{public KStream stream(AutoOffsetReset offsetReset, String... > topics)}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-4731) Add event-based session windows
[ https://issues.apache.org/jira/browse/KAFKA-4731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-4731: -- Assignee: Bill Bejeck > Add event-based session windows > --- > > Key: KAFKA-4731 > URL: https://issues.apache.org/jira/browse/KAFKA-4731 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Minor > Labels: needs-kip > > Kafka Streams allows to define session windows based on an time-based > inactivity gap. This can be used for _session detection_. > However, some data streams do contain event like "start session" and "end > session" to mark the begin and end of a session. For this use case, it is not > required to _detect_ session because session boundaries are known and not > based on time -- but on events. > This Jira is about adding support for those event-based session windows. To > add this feature, a KIP > (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) > is required to discuss the proposed design. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-4662) Improve test coverage TopologyBuilder
[ https://issues.apache.org/jira/browse/KAFKA-4662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-4662: -- Assignee: Bill Bejeck > Improve test coverage TopologyBuilder > - > > Key: KAFKA-4662 > URL: https://issues.apache.org/jira/browse/KAFKA-4662 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Bill Bejeck >Priority: Minor > Fix For: 0.10.3.0 > > > overloaded {{addSource}} methods with {{AutoOffsetReset}} param not tested. > Also some exception branches not covered -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4662) Improve test coverage TopologyBuilder
[ https://issues.apache.org/jira/browse/KAFKA-4662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15826055#comment-15826055 ] Bill Bejeck commented on KAFKA-4662: I missed these with my PR, picking this one up. > Improve test coverage TopologyBuilder > - > > Key: KAFKA-4662 > URL: https://issues.apache.org/jira/browse/KAFKA-4662 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Priority: Minor > Fix For: 0.10.3.0 > > > overloaded {{addSource}} methods with {{AutoOffsetReset}} param not tested. > Also some exception branches not covered -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores
[ https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824683#comment-15824683 ] Bill Bejeck commented on KAFKA-4468: Hi, Eno - I was, but for a couple of reasons I unassigned myself from this a few days ago. It's all yours if you want. Thanks for asking. > Correctly calculate the window end timestamp after read from state stores > - > > Key: KAFKA-4468 > URL: https://issues.apache.org/jira/browse/KAFKA-4468 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture > > When storing the WindowedStore on the persistent KV store, we only use the > start timestamp of the window as part of the combo-key as (start-timestamp, > key). The reason that we do not add the end-timestamp as well is that we can > always calculate it from the start timestamp + window_length, and hence we > can save 8 bytes per key on the persistent KV store. > However, after read it (via {{WindowedDeserializer}}) we do not set its end > timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix > this by calculating its end timestamp as mentioned above. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-4023) Add thread id as prefix in Kafka Streams thread logging
[ https://issues.apache.org/jira/browse/KAFKA-4023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-4023: -- Assignee: Bill Bejeck > Add thread id as prefix in Kafka Streams thread logging > --- > > Key: KAFKA-4023 > URL: https://issues.apache.org/jira/browse/KAFKA-4023 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie++ > > A single Kafka Streams instance can include multiple stream threads, and > hence without logging prefix it is difficult to determine which thread's > producing which log entries. > We should > 1) add the log-prefix as thread id in StreamThread logger, as well as its > contained StreamPartitionAssignor. > 2) add the log-prefix as task id in StreamTask / StandbyTask, as well as its > contained RecordCollector and ProcessorStateManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4023) Add thread id as prefix in Kafka Streams thread logging
[ https://issues.apache.org/jira/browse/KAFKA-4023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15422890#comment-15422890 ] Bill Bejeck commented on KAFKA-4023: Picking this one up. Just let me know if someone is currently working this and I'll unassign myself. > Add thread id as prefix in Kafka Streams thread logging > --- > > Key: KAFKA-4023 > URL: https://issues.apache.org/jira/browse/KAFKA-4023 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie++ > > A single Kafka Streams instance can include multiple stream threads, and > hence without logging prefix it is difficult to determine which thread's > producing which log entries. > We should > 1) add the log-prefix as thread id in StreamThread logger, as well as its > contained StreamPartitionAssignor. > 2) add the log-prefix as task id in StreamTask / StandbyTask, as well as its > contained RecordCollector and ProcessorStateManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3478) Finer Stream Flow Control
[ https://issues.apache.org/jira/browse/KAFKA-3478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15422907#comment-15422907 ] Bill Bejeck commented on KAFKA-3478: Is this task still available and is it a feature that is still in the current plan/desired to get done? > Finer Stream Flow Control > - > > Key: KAFKA-3478 > URL: https://issues.apache.org/jira/browse/KAFKA-3478 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang > Labels: user-experience > Fix For: 0.10.1.0 > > > Today we have a event-time based flow control mechanism in order to > synchronize multiple input streams in a best effort manner: > http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps > However, there are some use cases where users would like to have finer > control of the input streams, for example, with two input streams, one of > them always reading from offset 0 upon (re)-starting, and the other reading > for log end offset. > Today we only have one consumer config "offset.auto.reset" to control that > behavior, which means all streams are read either from "earliest" or "latest". > We should consider how to improve this settings to allow users have finer > control over these frameworks. > = > A finer flow control could also be used to allow for populating a {{KTable}} > (with an "initial" state) before starting the actual processing (this feature > was ask for in the mailing list multiple times already). Even if it is quite > hard to define, *when* the initial populating phase should end, this might > still be useful. There would be the following possibilities: > 1) an initial fixed time period for populating >(it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > 4) a throughput threshold, ie, if the populating frequency falls below > the threshold, the KTable is considered "finished" > 5) maybe something else ?? > The API might look something like this > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-4023) Add thread id as prefix in Kafka Streams thread logging
[ https://issues.apache.org/jira/browse/KAFKA-4023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4023 started by Bill Bejeck. -- > Add thread id as prefix in Kafka Streams thread logging > --- > > Key: KAFKA-4023 > URL: https://issues.apache.org/jira/browse/KAFKA-4023 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: newbie++ > > A single Kafka Streams instance can include multiple stream threads, and > hence without logging prefix it is difficult to determine which thread's > producing which log entries. > We should > 1) add the log-prefix as thread id in StreamThread logger, as well as its > contained StreamPartitionAssignor. > 2) add the log-prefix as task id in StreamTask / StandbyTask, as well as its > contained RecordCollector and ProcessorStateManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3478) Finer Stream Flow Control
[ https://issues.apache.org/jira/browse/KAFKA-3478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15429559#comment-15429559 ] Bill Bejeck commented on KAFKA-3478: [~mjsax], thanks for the clarification. I'll take a look at what it would take for different configurations, but I'll hold off doing anything concrete until some of the details for this task (or subtasks) are fleshed out. > Finer Stream Flow Control > - > > Key: KAFKA-3478 > URL: https://issues.apache.org/jira/browse/KAFKA-3478 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang > Labels: user-experience > Fix For: 0.10.1.0 > > > Today we have a event-time based flow control mechanism in order to > synchronize multiple input streams in a best effort manner: > http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps > However, there are some use cases where users would like to have finer > control of the input streams, for example, with two input streams, one of > them always reading from offset 0 upon (re)-starting, and the other reading > for log end offset. > Today we only have one consumer config "offset.auto.reset" to control that > behavior, which means all streams are read either from "earliest" or "latest". > We should consider how to improve this settings to allow users have finer > control over these frameworks. > = > A finer flow control could also be used to allow for populating a {{KTable}} > (with an "initial" state) before starting the actual processing (this feature > was ask for in the mailing list multiple times already). Even if it is quite > hard to define, *when* the initial populating phase should end, this might > still be useful. There would be the following possibilities: > 1) an initial fixed time period for populating >(it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > 4) a throughput threshold, ie, if the populating frequency falls below > the threshold, the KTable is considered "finished" > 5) maybe something else ?? > The API might look something like this > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3989) Add JMH module for Benchmarks
[ https://issues.apache.org/jira/browse/KAFKA-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15407054#comment-15407054 ] Bill Bejeck commented on KAFKA-3989: I opted to try the gradle shadow plugin first (https://github.com/johnrengelman/shadow/) I've been able to get JMH working as a sub-module but needed to make the following changes to the build.gradle script: dependencies { . // Added this entry for gradle shadow plugin classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.3' // } Then added this to apply plugin the jmh-benchmarks module: project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' } Is this acceptable ? > Add JMH module for Benchmarks > - > > Key: KAFKA-3989 > URL: https://issues.apache.org/jira/browse/KAFKA-3989 > Project: Kafka > Issue Type: Improvement >Reporter: Bill Bejeck >Assignee: Bill Bejeck > > JMH is a Java harness for building, running, and analyzing benchmarks written > in Java or JVM languages. To run properly JMH needs to be in it's own > module. This task will also investigate using the jmh -gradle pluging > [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH > from gradle. This is related to > [https://issues.apache.org/jira/browse/KAFKA-3973] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3989) Add JMH module for Benchmarks
[ https://issues.apache.org/jira/browse/KAFKA-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15407054#comment-15407054 ] Bill Bejeck edited comment on KAFKA-3989 at 8/4/16 3:03 AM: I opted to try the gradle shadow plugin first (https://github.com/johnrengelman/shadow/) I've been able to get JMH working as a sub-module but needed to make the following changes to the build.gradle script: `dependencies { . // Added this entry for gradle shadow plugin classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.3' // } ` Then added this to apply plugin the jmh-benchmarks module: project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' } Is this acceptable ? was (Author: bbejeck): I opted to try the gradle shadow plugin first (https://github.com/johnrengelman/shadow/) I've been able to get JMH working as a sub-module but needed to make the following changes to the build.gradle script: dependencies { . // Added this entry for gradle shadow plugin classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.3' // } Then added this to apply plugin the jmh-benchmarks module: project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' } Is this acceptable ? > Add JMH module for Benchmarks > - > > Key: KAFKA-3989 > URL: https://issues.apache.org/jira/browse/KAFKA-3989 > Project: Kafka > Issue Type: Improvement >Reporter: Bill Bejeck >Assignee: Bill Bejeck > > JMH is a Java harness for building, running, and analyzing benchmarks written > in Java or JVM languages. To run properly JMH needs to be in it's own > module. This task will also investigate using the jmh -gradle pluging > [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH > from gradle. This is related to > [https://issues.apache.org/jira/browse/KAFKA-3973] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4791 started by Bill Bejeck. -- > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-4791: --- Status: Patch Available (was: In Progress) > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-4114) Allow for different "auto.offset.reset" strategies for different input streams
[ https://issues.apache.org/jira/browse/KAFKA-4114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-4114: -- Assignee: Bill Bejeck (was: Guozhang Wang) > Allow for different "auto.offset.reset" strategies for different input streams > -- > > Key: KAFKA-4114 > URL: https://issues.apache.org/jira/browse/KAFKA-4114 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck > > Today we only have one consumer config "offset.auto.reset" to control that > behavior, which means all streams are read either from "earliest" or "latest". > However, it would be useful to improve this settings to allow users have > finer control over different input stream. For example, with two input > streams, one of them always reading from offset 0 upon (re)-starting, and the > other reading for log end offset. > This JIRA requires to extend {{KStreamBuilder}} API for methods > {{.stream(...)}} and {{.table(...)}} to add a new parameter that indicate the > initial offset to be used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)