Thanks for pointing me to that PR.

I applied the PR locally but still got:

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
> testShouldCountClicksPerRegion FAILED
    java.lang.StackOverflowError

I can go over that PR to see what can be referenced for solving this bug.

FYI

On Mon, Aug 20, 2018 at 5:21 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Is this related to the fix https://github.com/apache/kafka/pull/5502 that
> is currently being worked on?
>
>
> Guozhang
>
> On Mon, Aug 20, 2018 at 5:19 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Thanks for reporting and for creating the ticket!
> >
> > -Matthias
> >
> > On 8/20/18 5:17 PM, Ted Yu wrote:
> > > I was able to reproduce what you saw with modification
> > > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> > > I have logged KAFKA-7316 and am looking for a fix.
> > >
> > > FYI
> > >
> > > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dru...@arrcus.com>
> > wrote:
> > >
> > >> Isn’t that a bug then? Or can I fix my code somehow?
> > >>
> > >>
> > >>
> > >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com<mailto:
> > >> yuzhih...@gmail.com>) wrote:
> > >>
> > >> I think what happened in your use case was that the following implicit
> > >> from ImplicitConversions.scala kept wrapping the resultant KTable from
> > >> filter():
> > >>
> > >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
> > >>
> > >> leading to stack overflow.
> > >>
> > >> Cheers
> > >>
> > >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dru...@arrcus.com
> >
> > >> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> I’m using the org.kafka.streams.scala that was released with version
> > >>> 2.0.0. I’m getting a StackOverflowError as follows:
> > >>>
> > >>> java.lang.StackOverflowError
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>> .
> > >>> .
> > >>> .
> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
> > KTable.scala:49)
> > >>>
> > >>> The Scala version I’m using is 2.11.11 and the code leading to the
> > error
> > >>> is as follows (particularly the .filter).
> > >>>
> > >>> val builder = new StreamsBuilder
> > >>>
> > >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
> > >>>
> > >>> val customers = args.config.keys
> > >>>
> > >>> val predicates = customers.map { customerId =>
> > >>> (_: Array[Byte], message: CaseClassA) => message.customerId ==
> > customerId
> > >>> }.toSeq
> > >>>
> > >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
> > >>>
> > >>> val y = Printed.toSysOut[Windowed[Key], Long]
> > >>>
> > >>> customerIdToStream.foreach { case (customerId, customerStream) =>
> > >>> val customerConfig = args.config(customerId)
> > >>> customerStream
> > >>> .flatMap { case (_, message) =>
> > >>> message.objects.map {
> > >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
> > >>> }
> > >>> }
> > >>> .groupByKey
> > >>>
> > >>>
> > >> .windowedBy(TimeWindows.of(customerConfig.windowSize).
> > advanceBy(customerConfig.sliderSize))
> > >>> .count()
> > >>> .filter { case (_, count) => count >=
> > >>> customerConfig.frequencyThreshold }
> > >>> .toStream
> > >>> .print(y)
> > >>> }
> > >>>
> > >>> Is this a bug with the new scala module related to:
> > >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
> > >>> Or am I doing something wrong?
> > >>>
> > >>> Thanks,
> > >>> Druhin
> > >>>
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>

Reply via email to