Sent out a PR #5543 which fixes the reported bug,
with 
StreamToTableJoinScalaIntegrationTestImplicitSerdes.testShouldCountClicksPerRegion
modified adding the filter methods.

FYI

On Mon, Aug 20, 2018 at 5:26 PM Ted Yu <yuzhih...@gmail.com> wrote:

> Thanks for pointing me to that PR.
>
> I applied the PR locally but still got:
>
> org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
> > testShouldCountClicksPerRegion FAILED
>     java.lang.StackOverflowError
>
> I can go over that PR to see what can be referenced for solving this bug.
>
> FYI
>
> On Mon, Aug 20, 2018 at 5:21 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Is this related to the fix https://github.com/apache/kafka/pull/5502 that
>> is currently being worked on?
>>
>>
>> Guozhang
>>
>> On Mon, Aug 20, 2018 at 5:19 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>> > Thanks for reporting and for creating the ticket!
>> >
>> > -Matthias
>> >
>> > On 8/20/18 5:17 PM, Ted Yu wrote:
>> > > I was able to reproduce what you saw with modification
>> > > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>> > > I have logged KAFKA-7316 and am looking for a fix.
>> > >
>> > > FYI
>> > >
>> > > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dru...@arrcus.com>
>> > wrote:
>> > >
>> > >> Isn’t that a bug then? Or can I fix my code somehow?
>> > >>
>> > >>
>> > >>
>> > >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com
>> <mailto:
>> > >> yuzhih...@gmail.com>) wrote:
>> > >>
>> > >> I think what happened in your use case was that the following
>> implicit
>> > >> from ImplicitConversions.scala kept wrapping the resultant KTable
>> from
>> > >> filter():
>> > >>
>> > >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
>> > >>
>> > >> leading to stack overflow.
>> > >>
>> > >> Cheers
>> > >>
>> > >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <
>> dru...@arrcus.com>
>> > >> wrote:
>> > >>
>> > >>> Hi,
>> > >>>
>> > >>> I’m using the org.kafka.streams.scala that was released with version
>> > >>> 2.0.0. I’m getting a StackOverflowError as follows:
>> > >>>
>> > >>> java.lang.StackOverflowError
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>> .
>> > >>> .
>> > >>> .
>> > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(
>> > KTable.scala:49)
>> > >>>
>> > >>> The Scala version I’m using is 2.11.11 and the code leading to the
>> > error
>> > >>> is as follows (particularly the .filter).
>> > >>>
>> > >>> val builder = new StreamsBuilder
>> > >>>
>> > >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic)
>> > >>>
>> > >>> val customers = args.config.keys
>> > >>>
>> > >>> val predicates = customers.map { customerId =>
>> > >>> (_: Array[Byte], message: CaseClassA) => message.customerId ==
>> > customerId
>> > >>> }.toSeq
>> > >>>
>> > >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap
>> > >>>
>> > >>> val y = Printed.toSysOut[Windowed[Key], Long]
>> > >>>
>> > >>> customerIdToStream.foreach { case (customerId, customerStream) =>
>> > >>> val customerConfig = args.config(customerId)
>> > >>> customerStream
>> > >>> .flatMap { case (_, message) =>
>> > >>> message.objects.map {
>> > >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
>> > >>> }
>> > >>> }
>> > >>> .groupByKey
>> > >>>
>> > >>>
>> > >> .windowedBy(TimeWindows.of(customerConfig.windowSize).
>> > advanceBy(customerConfig.sliderSize))
>> > >>> .count()
>> > >>> .filter { case (_, count) => count >=
>> > >>> customerConfig.frequencyThreshold }
>> > >>> .toStream
>> > >>> .print(y)
>> > >>> }
>> > >>>
>> > >>> Is this a bug with the new scala module related to:
>> > >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ?
>> > >>> Or am I doing something wrong?
>> > >>>
>> > >>> Thanks,
>> > >>> Druhin
>> > >>>
>> > >>
>> > >
>> >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Reply via email to