Sent out a PR #5543 which fixes the reported bug, with StreamToTableJoinScalaIntegrationTestImplicitSerdes.testShouldCountClicksPerRegion modified adding the filter methods.
FYI On Mon, Aug 20, 2018 at 5:26 PM Ted Yu <yuzhih...@gmail.com> wrote: > Thanks for pointing me to that PR. > > I applied the PR locally but still got: > > org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes > > testShouldCountClicksPerRegion FAILED > java.lang.StackOverflowError > > I can go over that PR to see what can be referenced for solving this bug. > > FYI > > On Mon, Aug 20, 2018 at 5:21 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Is this related to the fix https://github.com/apache/kafka/pull/5502 that >> is currently being worked on? >> >> >> Guozhang >> >> On Mon, Aug 20, 2018 at 5:19 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >> > Thanks for reporting and for creating the ticket! >> > >> > -Matthias >> > >> > On 8/20/18 5:17 PM, Ted Yu wrote: >> > > I was able to reproduce what you saw with modification >> > > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala >> > > I have logged KAFKA-7316 and am looking for a fix. >> > > >> > > FYI >> > > >> > > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dru...@arrcus.com> >> > wrote: >> > > >> > >> Isn’t that a bug then? Or can I fix my code somehow? >> > >> >> > >> >> > >> >> > >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com >> <mailto: >> > >> yuzhih...@gmail.com>) wrote: >> > >> >> > >> I think what happened in your use case was that the following >> implicit >> > >> from ImplicitConversions.scala kept wrapping the resultant KTable >> from >> > >> filter(): >> > >> >> > >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] = >> > >> >> > >> leading to stack overflow. >> > >> >> > >> Cheers >> > >> >> > >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel < >> dru...@arrcus.com> >> > >> wrote: >> > >> >> > >>> Hi, >> > >>> >> > >>> I’m using the org.kafka.streams.scala that was released with version >> > >>> 2.0.0. I’m getting a StackOverflowError as follows: >> > >>> >> > >>> java.lang.StackOverflowError >> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( >> > KTable.scala:49) >> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( >> > KTable.scala:49) >> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( >> > KTable.scala:49) >> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( >> > KTable.scala:49) >> > >>> . >> > >>> . >> > >>> . >> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( >> > KTable.scala:49) >> > >>> >> > >>> The Scala version I’m using is 2.11.11 and the code leading to the >> > error >> > >>> is as follows (particularly the .filter). >> > >>> >> > >>> val builder = new StreamsBuilder >> > >>> >> > >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic) >> > >>> >> > >>> val customers = args.config.keys >> > >>> >> > >>> val predicates = customers.map { customerId => >> > >>> (_: Array[Byte], message: CaseClassA) => message.customerId == >> > customerId >> > >>> }.toSeq >> > >>> >> > >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap >> > >>> >> > >>> val y = Printed.toSysOut[Windowed[Key], Long] >> > >>> >> > >>> customerIdToStream.foreach { case (customerId, customerStream) => >> > >>> val customerConfig = args.config(customerId) >> > >>> customerStream >> > >>> .flatMap { case (_, message) => >> > >>> message.objects.map { >> > >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1 >> > >>> } >> > >>> } >> > >>> .groupByKey >> > >>> >> > >>> >> > >> .windowedBy(TimeWindows.of(customerConfig.windowSize). >> > advanceBy(customerConfig.sliderSize)) >> > >>> .count() >> > >>> .filter { case (_, count) => count >= >> > >>> customerConfig.frequencyThreshold } >> > >>> .toStream >> > >>> .print(y) >> > >>> } >> > >>> >> > >>> Is this a bug with the new scala module related to: >> > >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ? >> > >>> Or am I doing something wrong? >> > >>> >> > >>> Thanks, >> > >>> Druhin >> > >>> >> > >> >> > > >> > >> > >> >> >> -- >> -- Guozhang >> >