Hi Eric,

Ah, that’s a bummer. The correct serde is the session windowed serde, as I can 
see you know. I’m afraid I’m a bit rusty on implicit resolution rules, so I 
can’t be much help there. 

But my general recommendation for implicits is that when things get weird, just 
don’t use them at all. For example, you can just explicitly pass the Produced 
in the second arg list of ‘to’. 

One other tip is that the serialized form produced by those serdes is kind of 
specialized and might not be the most convenient for your use. If this is just 
a POC, if suggest mapping the keys to strings, so they are human-readable. If 
this is a production use case, then you might want to use a more 
self-documenting format like JSON or AVRO. Just my two cents. 

I hope this helps!
-John 

On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> I keep getting '*ambiguous implicit values*' message in the following code.
> I tried several things (as can be seen from a couple of lines I've
> commented out). Any ideas on how to fix this? This is in *Scala*.
> 
>  def createTopology(conf: Config, properties: Properties): Topology =
> {//    implicit val sessionSerde =
> Serde[WindowedSerdes.SessionWindowedSerde[String]]//    implicit val
> produced: Produced[Windowed[String], Long] =
> Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
>     implicit val produced: Produced[Windowed[String], Long] =
> Produced.`with`[Windowed[String], Long]
>     implicit val consumed: Consumed[String, String] =
> Consumed.`with`[String, String]
> 
>     val builder: StreamsBuilder = new StreamsBuilder()
>     builder.stream("streams-plaintext-input")
>         .groupBy((_, word) => word)
>         .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
>         .count()
>         .toStream.to("streams-pipe-output")
> 
>     builder.build()
> 
>   }
> 
> *Compiler Errors:*
> 
> Error:(52, 78) ambiguous implicit values:
>  both method timeWindowedSerde in object Serdes of type [T](implicit
> tSerde: 
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
>  and method sessionWindowedSerde in object Serdes of type [T](implicit
> tSerde: 
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
>  match expected type
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
>     implicit val produced: Produced[Windowed[String], Long] =
> Produced.`with`[Windowed[String], Long]
> Error:(52, 78) could not find implicit value for parameter keySerde:
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
>     implicit val produced: Produced[Windowed[String], Long] =
> Produced.`with`[Windowed[String], Long]
> Error:(52, 78) not enough arguments for method with: (implicit
> keySerde: 
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
> implicit valueSerde:
> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
> value parameters keySerde, valueSerde.
>     implicit val produced: Produced[Windowed[String], Long] =
> Produced.`with`[Windowed[String], Long]
>

Reply via email to