Thanks for pointing me to that PR. I applied the PR locally but still got:
org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes > testShouldCountClicksPerRegion FAILED java.lang.StackOverflowError I can go over that PR to see what can be referenced for solving this bug. FYI On Mon, Aug 20, 2018 at 5:21 PM Guozhang Wang <wangg...@gmail.com> wrote: > Is this related to the fix https://github.com/apache/kafka/pull/5502 that > is currently being worked on? > > > Guozhang > > On Mon, Aug 20, 2018 at 5:19 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > Thanks for reporting and for creating the ticket! > > > > -Matthias > > > > On 8/20/18 5:17 PM, Ted Yu wrote: > > > I was able to reproduce what you saw with modification > > > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > > > I have logged KAFKA-7316 and am looking for a fix. > > > > > > FYI > > > > > > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dru...@arrcus.com> > > wrote: > > > > > >> Isn’t that a bug then? Or can I fix my code somehow? > > >> > > >> > > >> > > >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com<mailto: > > >> yuzhih...@gmail.com>) wrote: > > >> > > >> I think what happened in your use case was that the following implicit > > >> from ImplicitConversions.scala kept wrapping the resultant KTable from > > >> filter(): > > >> > > >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] = > > >> > > >> leading to stack overflow. > > >> > > >> Cheers > > >> > > >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dru...@arrcus.com > > > > >> wrote: > > >> > > >>> Hi, > > >>> > > >>> I’m using the org.kafka.streams.scala that was released with version > > >>> 2.0.0. I’m getting a StackOverflowError as follows: > > >>> > > >>> java.lang.StackOverflowError > > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( > > KTable.scala:49) > > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( > > KTable.scala:49) > > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( > > KTable.scala:49) > > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( > > KTable.scala:49) > > >>> . > > >>> . > > >>> . > > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( > > KTable.scala:49) > > >>> > > >>> The Scala version I’m using is 2.11.11 and the code leading to the > > error > > >>> is as follows (particularly the .filter). > > >>> > > >>> val builder = new StreamsBuilder > > >>> > > >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic) > > >>> > > >>> val customers = args.config.keys > > >>> > > >>> val predicates = customers.map { customerId => > > >>> (_: Array[Byte], message: CaseClassA) => message.customerId == > > customerId > > >>> }.toSeq > > >>> > > >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap > > >>> > > >>> val y = Printed.toSysOut[Windowed[Key], Long] > > >>> > > >>> customerIdToStream.foreach { case (customerId, customerStream) => > > >>> val customerConfig = args.config(customerId) > > >>> customerStream > > >>> .flatMap { case (_, message) => > > >>> message.objects.map { > > >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1 > > >>> } > > >>> } > > >>> .groupByKey > > >>> > > >>> > > >> .windowedBy(TimeWindows.of(customerConfig.windowSize). > > advanceBy(customerConfig.sliderSize)) > > >>> .count() > > >>> .filter { case (_, count) => count >= > > >>> customerConfig.frequencyThreshold } > > >>> .toStream > > >>> .print(y) > > >>> } > > >>> > > >>> Is this a bug with the new scala module related to: > > >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ? > > >>> Or am I doing something wrong? > > >>> > > >>> Thanks, > > >>> Druhin > > >>> > > >> > > > > > > > > > > -- > -- Guozhang >