Here's a fix to #1020 https://github.com/spotify/scio/pull/1032
On Sun, Jan 21, 2018 at 4:36 PM Neville Li <[email protected]> wrote: > Awesome! > We have't wrapped any stateful processing API in scala but if you have > working snippet or ideas it'd be great to share in that ticket. > > On Sat, Jan 20, 2018 at 4:31 PM Carlos Alonso <[email protected]> > wrote: > >> Thanks Neville!! >> >> Your recommendation worked great. Thanks for your help!! >> >> As a side note, I found this issue: >> https://github.com/spotify/scio/issues/448 >> >> I can share/help there with our experience, as our job, with scio + >> stateful + timely processing is working fine as of today >> >> Regards!! >> >> On Fri, Jan 19, 2018 at 6:21 PM Neville Li <[email protected]> wrote: >> >>> Welcome. >>> >>> Added an issue so we may improve this in the future: >>> https://github.com/spotify/scio/issues/1020 >>> >>> >>> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <[email protected]> >>> wrote: >>> >>>> To build the beam transform I was following this example: >>>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala >>>> >>>> To be honest I don't know how to apply timely and stateful processing >>>> without using a beam transform or how to rewrite it using the scio built-in >>>> you suggest. Could you please give me an example? >>>> >>>> Thanks for your help! >>>> >>>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <[email protected]> >>>> wrote: >>>> >>>>> That happens when you mix beam transforms into scio and defeats the >>>>> safety we have in place. Map the values into something beam-serializable >>>>> first or rewrite the transform with a scio built-in which takes care of >>>>> KvCoder. >>>>> >>>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <[email protected]> >>>>> wrote: >>>>> >>>>>> I'm following this example: >>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60 >>>>>> >>>>>> because I'm building something very similar to a group into batches >>>>>> functionality. If I don't set the coder manually, this exception arises: >>>>>> https://pastebin.com/xxdDMXSf >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> You shouldn't manually set coder in most cases. It defaults to >>>>>>> KryoAtomicCoder for most Scala types. >>>>>>> More details: >>>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders >>>>>>> >>>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> May it be because I’m using >>>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(), >>>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) >>>>>>>> at >>>>>>>> some point in the pipeline >>>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]) >>>>>>>> outputs a SerializableCoder)? >>>>>>>> >>>>>>>> This is something I've always wondered. How does one specify a >>>>>>>> coder for a case class? >>>>>>>> >>>>>>>> Regards >>>>>>>> >>>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an >>>>>>>>> GH issue with ideally a snippet that can reproduce the problem? >>>>>>>>> >>>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi everyone!! >>>>>>>>>> >>>>>>>>>> I'm building a pipeline to store items from a Google PubSub >>>>>>>>>> subscription into GCS buckets. In order to do it I'm using both >>>>>>>>>> stateful >>>>>>>>>> and timely processing and after building and testing the project >>>>>>>>>> locally I >>>>>>>>>> tried to run it on Google Dataflow and I started getting those >>>>>>>>>> errors. >>>>>>>>>> >>>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq >>>>>>>>>> >>>>>>>>>> The item I'm trying to serialize is a KV[String, >>>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class >>>>>>>>>> defined as >>>>>>>>>> (content: String, attrs: Map[String, String]) >>>>>>>>>> >>>>>>>>>> The underlying clause is java.io.NotSerializableException: >>>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's >>>>>>>>>> Scio as >>>>>>>>>> well) which may suggest that the issue is on serializing the Map, >>>>>>>>>> but to be >>>>>>>>>> honest, I don't know what does it mean and how to fix it. >>>>>>>>>> >>>>>>>>>> Can anyone help me, please? >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>
