Hi Nicu, I'd need to see more context to help - for example, what is the value of `topicName`? I've just finished writing Streams tests using the test driver, so can hopefully help with more code :)
Cheers, Liam Clarke-Hutchinson On Tue, Apr 21, 2020 at 8:40 PM Dumitru-Nicolae Marasoui < nicolae.maras...@ovoenergy.com> wrote: > Hi Murilo & community, > Thanks for your answer, > I see that in the code this is done just before: > > val topicInput = inner.createInputTopic(topicName, serdeKey.serializer(), > serdeValue.serializer()) > val input = records.map { case (k, v) => new TestRecord(k, v) } > topicInput.pipeRecordList(input.asJava) > > What could be the explanation? > Thank y0u, > Nicu > > On Tue, 21 Apr 2020 at 04:04, Murilo Tavares <murilo...@gmail.com> wrote: > > > Hi Dumitru > > The TopologyTestDriver you are using was designed to unit test your > > topology, and will not work with the stack you run locally. > > That said, if you want to test your topology, you first need to create > the > > fake input topic by calling “topologyDriver.createInputTopic()” (assuming > > you are using v2.4+) for every input topic you are using. > > Since you use “pipeRecordList”, make sure your records are all to the > same > > topic. > > Murilo > > > > > > On Mon, Apr 20, 2020 at 3:27 PM Dumitru-Nicolae Marasoui < > > nicolae.maras...@ovoenergy.com> wrote: > > > > > Hello kafka community, > > > I am getting the stack trace below in an attempt at an integration test > > for > > > a new kafka-streams ETL between two topics (where the source topic is a > > new > > > one). > > > The way the local framework is organized, initially a local confluent > > stack > > > is started such as the schema registry server and a broker, and a > script > > > takes a project file and generates topics and injects some messages in > > > them. > > > After that there must be a step that I am missing (because these > > > integration tests can run without the broker or registry server running > > at > > > all) > > > Thing is that when I run the integration test I created I get the > > following > > > Exception: > > > Thank you for help, > > > Nicu > > > > > > Unknown topic: identity_users_v1 > > > java.lang.IllegalArgumentException: Unknown topic: identity_users_v1 > > > at > > > > > > > > > org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:488) > > > at > > > > > > > > > org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:742) > > > at > > > > > > org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115) > > > at > > > > > > > > > org.apache.kafka.streams.TestInputTopic.pipeRecordList(TestInputTopic.java:188) > > > at > > > > > > > > > com.ovoenergy.tests.testtools.TopologyTest$TopologyTestDriverOps.pushRecordsTo(TopologyTests.scala:88) > > > at > > > > > > > > > com.ovoenergy.globaltopics.pipelines.orion.UserEventV1PipelineTest.$anonfun$new$1(UserEventV1PipelineTest.scala:31) > > > at > > > > > > > > > org.scalatest.fixture.TestSuite$TestFunAndConfigMap.$anonfun$apply$1(TestSuite.scala:132) > > > at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > > > at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > > > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > > > at > > > > > > > > > org.scalatest.fixture.TestSuite$TestFunAndConfigMap.apply(TestSuite.scala:132) > > > at > > > > > > > > > com.ovoenergy.tests.testtools.FlatSpecTopologyTest.withFixture(TopologyTests.scala:28) > > > at > > > > > > > > > org.scalatest.fixture.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:2127) > > > at > > > > > > > > > org.scalatest.fixture.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:2138) > > > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286) > > > at org.scalatest.fixture.FlatSpecLike.runTest(FlatSpecLike.scala:2138) > > > at org.scalatest.fixture.FlatSpecLike.runTest$(FlatSpecLike.scala:2119) > > > at org.scalatest.fixture.FlatSpec.runTest(FlatSpec.scala:226) > > > at > > > > > > > > > org.scalatest.fixture.FlatSpecLike.$anonfun$runTests$1(FlatSpecLike.scala:2181) > > > at > > org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393) > > > at scala.collection.immutable.List.foreach(List.scala:392) > > > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381) > > > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:370) > > > at > > org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:407) > > > at scala.collection.immutable.List.foreach(List.scala:392) > > > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381) > > > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376) > > > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458) > > > at org.scalatest.fixture.FlatSpecLike.runTests(FlatSpecLike.scala:2181) > > > at > org.scalatest.fixture.FlatSpecLike.runTests$(FlatSpecLike.scala:2180) > > > at org.scalatest.fixture.FlatSpec.runTests(FlatSpec.scala:226) > > > at org.scalatest.Suite.run(Suite.scala:1124) > > > at org.scalatest.Suite.run$(Suite.scala:1106) > > > at org.scalatest.fixture.FlatSpec.org > > > $scalatest$fixture$FlatSpecLike$$super$run(FlatSpec.scala:226) > > > at > > > > > > org.scalatest.fixture.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:2202) > > > at org.scalatest.SuperEngine.runImpl(Engine.scala:518) > > > at org.scalatest.fixture.FlatSpecLike.run(FlatSpecLike.scala:2202) > > > at org.scalatest.fixture.FlatSpecLike.run$(FlatSpecLike.scala:2201) > > > at org.scalatest.fixture.FlatSpec.run(FlatSpec.scala:226) > > > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) > > > at > > > > > > > > > org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349) > > > at > > > > > > > > > org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343) > > > at scala.collection.immutable.List.foreach(List.scala:392) > > > at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343) > > > at > > > > > > > > > org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033) > > > at > > > > > > > > > org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011) > > > at > > > > > > > > > org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509) > > > at > > > > > > > > > org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011) > > > at org.scalatest.tools.Runner$.run(Runner.scala:850) > > > at org.scalatest.tools.Runner.run(Runner.scala) > > > at > > > > > > > > > org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133) > > > at > > > > > > > > > org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27) > > > > > > > > > -- > > > Thank you, > > > Nicolae Marasoiu > > > Scala Engineer > > > Orion, OVO Group > > > > > > > > -- > Thank you, > Nicolae Marasoiu > Scala Engineer > Orion, OVO Group >