In this case it's probably easiest to map the scala `Map[K, V]` into a `java.util.Map<K, V>` and explicitly set a `MapCoder<K, V>` so you don't have to deal with internal coder inference.
On Fri, Jan 19, 2018 at 11:03 AM Neville Li <[email protected]> wrote: > That happens when you mix beam transforms into scio and defeats the safety > we have in place. Map the values into something beam-serializable first or > rewrite the transform with a scio built-in which takes care of KvCoder. > > On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <[email protected]> wrote: > >> I'm following this example: >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60 >> >> because I'm building something very similar to a group into batches >> functionality. If I don't set the coder manually, this exception arises: >> https://pastebin.com/xxdDMXSf >> >> Thanks! >> >> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <[email protected]> wrote: >> >>> You shouldn't manually set coder in most cases. It defaults to >>> KryoAtomicCoder for most Scala types. >>> More details: >>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders >>> >>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <[email protected]> >>> wrote: >>> >>>> May it be because I’m using >>>> .setCoder(KvCoder.of(StringUtf8Coder.of(), >>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at >>>> some point in the pipeline >>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]) >>>> outputs a SerializableCoder)? >>>> >>>> This is something I've always wondered. How does one specify a coder >>>> for a case class? >>>> >>>> Regards >>>> >>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <[email protected]> wrote: >>>> >>>>> Not sure why it falls back to SerializableCoder. Can you file an GH >>>>> issue with ideally a snippet that can reproduce the problem? >>>>> >>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi everyone!! >>>>>> >>>>>> I'm building a pipeline to store items from a Google PubSub >>>>>> subscription into GCS buckets. In order to do it I'm using both stateful >>>>>> and timely processing and after building and testing the project locally >>>>>> I >>>>>> tried to run it on Google Dataflow and I started getting those errors. >>>>>> >>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq >>>>>> >>>>>> The item I'm trying to serialize is a KV[String, >>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined >>>>>> as >>>>>> (content: String, attrs: Map[String, String]) >>>>>> >>>>>> The underlying clause is java.io.NotSerializableException: >>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio >>>>>> as >>>>>> well) which may suggest that the issue is on serializing the Map, but to >>>>>> be >>>>>> honest, I don't know what does it mean and how to fix it. >>>>>> >>>>>> Can anyone help me, please? >>>>>> Thanks! >>>>>> >>>>>
