I'm following this example: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
because I'm building something very similar to a group into batches functionality. If I don't set the coder manually, this exception arises: https://pastebin.com/xxdDMXSf Thanks! On Fri, Jan 19, 2018 at 4:35 PM Neville Li <[email protected]> wrote: > You shouldn't manually set coder in most cases. It defaults to > KryoAtomicCoder for most Scala types. > More details: > https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders > > On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <[email protected]> wrote: > >> May it be because I’m using >> .setCoder(KvCoder.of(StringUtf8Coder.of(), >> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at >> some point in the pipeline >> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]) >> outputs a SerializableCoder)? >> >> This is something I've always wondered. How does one specify a coder for >> a case class? >> >> Regards >> >> On Fri, 19 Jan 2018 at 15:51, Neville Li <[email protected]> wrote: >> >>> Not sure why it falls back to SerializableCoder. Can you file an GH >>> issue with ideally a snippet that can reproduce the problem? >>> >>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <[email protected]> >>> wrote: >>> >>>> Hi everyone!! >>>> >>>> I'm building a pipeline to store items from a Google PubSub >>>> subscription into GCS buckets. In order to do it I'm using both stateful >>>> and timely processing and after building and testing the project locally I >>>> tried to run it on Google Dataflow and I started getting those errors. >>>> >>>> The full stack trace is here: https://pastebin.com/LqecPhsq >>>> >>>> The item I'm trying to serialize is a KV[String, MessageWithAttributes] >>>> and MessageWithAttributes is a case class defined as (content: String, >>>> attrs: Map[String, String]) >>>> >>>> The underlying clause is java.io.NotSerializableException: >>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as >>>> well) which may suggest that the issue is on serializing the Map, but to be >>>> honest, I don't know what does it mean and how to fix it. >>>> >>>> Can anyone help me, please? >>>> Thanks! >>>> >>>
