Hi,
I am aware that more than one method from KTable.scala have this issue.

Once I find a solution, I will apply the fix to the methods you listed.

Cheers

On Mon, Aug 20, 2018 at 5:23 PM Druhin Sagar Goel <dru...@arrcus.com> wrote:

> Thanks a lot Ted!
>
> FYI - The issue is not limited to the
> org.apache.kafka.streams.scala.KTable.filter. It also happens with
> org.apache.kafka.streams.scala.KTable.filterNot,
> org.apache.kafka.streams.scala.KStream.foreach and
> org.apache.kafka.streams.scala.KStream.peek.
>
> - Druhin
>
>
> On August 20, 2018 at 5:19:36 PM, Matthias J. Sax (matth...@confluent.io
> <mailto:matth...@confluent.io>) wrote:
>
> Thanks for reporting and for creating the ticket!
>
> -Matthias
>
> On 8/20/18 5:17 PM, Ted Yu wrote:
> > I was able to reproduce what you saw with modification
> > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> > I have logged KAFKA-7316 and am looking for a fix.
> >
> > FYI
> >
> > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dru...@arrcus.com>
> wrote:
> >
> >> Isn’t that a bug then? Or can I fix my code somehow?
> >>
> >>
> >>
> >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com<mailto:
> >> yuzhih...@gmail.com>) wrote:
> >>
> >> I think what happened in your use case was that the following implicit
> >> from ImplicitConversions.scala kept wrapping the resultant KTable from
> >> filter():
> >>
> >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
> >>
> >> leading to stack overflow.
> >>
> >> Cheers
> >>
> >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dru...@arrcus.com>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I’m using the org.kafka.streams.scala that was released with version
> >>> 2.0.0. I’m getting a StackOverflowError as follows:
> >>>
> >>> java.lang.StackOverflowError
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>> .
> >>> .
> >>> .
> >>> at
> org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> >>>
> >>> The Scala version I’m using is 2.11.11 and the code leading to the
> error
> >>> is as follows (particularly the .filter).
> >>>
> >>> val builder = new StreamsBuilder
> >>>
> >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
> >>>
> >>> val customers = args.config.keys
> >>>
> >>> val predicates = customers.map { customerId =>
> >>> (_: Array[Byte], message: CaseClassA) => message.customerId ==
> customerId
> >>> }.toSeq
> >>>
> >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
> >>>
> >>> val y = Printed.toSysOut[Windowed[Key], Long]
> >>>
> >>> customerIdToStream.foreach { case (customerId, customerStream) =>
> >>> val customerConfig = args.config(customerId)
> >>> customerStream
> >>> .flatMap { case (_, message) =>
> >>> message.objects.map {
> >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> >>> }
> >>> }
> >>> .groupByKey
> >>>
> >>>
> >>
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
> >>> .count()
> >>> .filter { case (_, count) => count >=
> >>> customerConfig.frequencyThreshold }
> >>> .toStream
> >>> .print(y)
> >>> }
> >>>
> >>> Is this a bug with the new scala module related to:
> >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> >>> Or am I doing something wrong?
> >>>
> >>> Thanks,
> >>> Druhin
> >>>
> >>
> >
>
>

Reply via email to