Welcome. Added an issue so we may improve this in the future: https://github.com/spotify/scio/issues/1020
On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <[email protected]> wrote: > To build the beam transform I was following this example: > https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala > > To be honest I don't know how to apply timely and stateful processing > without using a beam transform or how to rewrite it using the scio built-in > you suggest. Could you please give me an example? > > Thanks for your help! > > On Fri, Jan 19, 2018 at 5:04 PM Neville Li <[email protected]> wrote: > >> That happens when you mix beam transforms into scio and defeats the >> safety we have in place. Map the values into something beam-serializable >> first or rewrite the transform with a scio built-in which takes care of >> KvCoder. >> >> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <[email protected]> >> wrote: >> >>> I'm following this example: >>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60 >>> >>> because I'm building something very similar to a group into batches >>> functionality. If I don't set the coder manually, this exception arises: >>> https://pastebin.com/xxdDMXSf >>> >>> Thanks! >>> >>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <[email protected]> >>> wrote: >>> >>>> You shouldn't manually set coder in most cases. It defaults to >>>> KryoAtomicCoder for most Scala types. >>>> More details: >>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders >>>> >>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <[email protected]> >>>> wrote: >>>> >>>>> May it be because I’m using >>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(), >>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) >>>>> at >>>>> some point in the pipeline >>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]) >>>>> outputs a SerializableCoder)? >>>>> >>>>> This is something I've always wondered. How does one specify a coder >>>>> for a case class? >>>>> >>>>> Regards >>>>> >>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <[email protected]> >>>>> wrote: >>>>> >>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH >>>>>> issue with ideally a snippet that can reproduce the problem? >>>>>> >>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi everyone!! >>>>>>> >>>>>>> I'm building a pipeline to store items from a Google PubSub >>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful >>>>>>> and timely processing and after building and testing the project >>>>>>> locally I >>>>>>> tried to run it on Google Dataflow and I started getting those errors. >>>>>>> >>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq >>>>>>> >>>>>>> The item I'm trying to serialize is a KV[String, >>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class >>>>>>> defined as >>>>>>> (content: String, attrs: Map[String, String]) >>>>>>> >>>>>>> The underlying clause is java.io.NotSerializableException: >>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's >>>>>>> Scio as >>>>>>> well) which may suggest that the issue is on serializing the Map, but >>>>>>> to be >>>>>>> honest, I don't know what does it mean and how to fix it. >>>>>>> >>>>>>> Can anyone help me, please? >>>>>>> Thanks! >>>>>>> >>>>>>
