I've added a comment with a link to our working Stateful and timely processing solution: https://github.com/spotify/scio/issues/448#issuecomment-364705100
On Fri, Jan 26, 2018 at 1:43 AM Neville Li <[email protected]> wrote: > Here's a fix to #1020 > https://github.com/spotify/scio/pull/1032 > > On Sun, Jan 21, 2018 at 4:36 PM Neville Li <[email protected]> wrote: > >> Awesome! >> We have't wrapped any stateful processing API in scala but if you have >> working snippet or ideas it'd be great to share in that ticket. >> >> On Sat, Jan 20, 2018 at 4:31 PM Carlos Alonso <[email protected]> >> wrote: >> >>> Thanks Neville!! >>> >>> Your recommendation worked great. Thanks for your help!! >>> >>> As a side note, I found this issue: >>> https://github.com/spotify/scio/issues/448 >>> >>> I can share/help there with our experience, as our job, with scio + >>> stateful + timely processing is working fine as of today >>> >>> Regards!! >>> >>> On Fri, Jan 19, 2018 at 6:21 PM Neville Li <[email protected]> >>> wrote: >>> >>>> Welcome. >>>> >>>> Added an issue so we may improve this in the future: >>>> https://github.com/spotify/scio/issues/1020 >>>> >>>> >>>> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <[email protected]> >>>> wrote: >>>> >>>>> To build the beam transform I was following this example: >>>>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala >>>>> >>>>> To be honest I don't know how to apply timely and stateful processing >>>>> without using a beam transform or how to rewrite it using the scio >>>>> built-in >>>>> you suggest. Could you please give me an example? >>>>> >>>>> Thanks for your help! >>>>> >>>>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <[email protected]> >>>>> wrote: >>>>> >>>>>> That happens when you mix beam transforms into scio and defeats the >>>>>> safety we have in place. Map the values into something beam-serializable >>>>>> first or rewrite the transform with a scio built-in which takes care of >>>>>> KvCoder. >>>>>> >>>>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> I'm following this example: >>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60 >>>>>>> >>>>>>> because I'm building something very similar to a group into batches >>>>>>> functionality. If I don't set the coder manually, this exception arises: >>>>>>> https://pastebin.com/xxdDMXSf >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> You shouldn't manually set coder in most cases. It defaults to >>>>>>>> KryoAtomicCoder for most Scala types. >>>>>>>> More details: >>>>>>>> >>>>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders >>>>>>>> >>>>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> May it be because I’m using >>>>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(), >>>>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) >>>>>>>>> at >>>>>>>>> some point in the pipeline >>>>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]) >>>>>>>>> outputs a SerializableCoder)? >>>>>>>>> >>>>>>>>> This is something I've always wondered. How does one specify a >>>>>>>>> coder for a case class? >>>>>>>>> >>>>>>>>> Regards >>>>>>>>> >>>>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an >>>>>>>>>> GH issue with ideally a snippet that can reproduce the problem? >>>>>>>>>> >>>>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi everyone!! >>>>>>>>>>> >>>>>>>>>>> I'm building a pipeline to store items from a Google PubSub >>>>>>>>>>> subscription into GCS buckets. In order to do it I'm using both >>>>>>>>>>> stateful >>>>>>>>>>> and timely processing and after building and testing the project >>>>>>>>>>> locally I >>>>>>>>>>> tried to run it on Google Dataflow and I started getting those >>>>>>>>>>> errors. >>>>>>>>>>> >>>>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq >>>>>>>>>>> >>>>>>>>>>> The item I'm trying to serialize is a KV[String, >>>>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class >>>>>>>>>>> defined as >>>>>>>>>>> (content: String, attrs: Map[String, String]) >>>>>>>>>>> >>>>>>>>>>> The underlying clause is java.io.NotSerializableException: >>>>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's >>>>>>>>>>> Scio as >>>>>>>>>>> well) which may suggest that the issue is on serializing the Map, >>>>>>>>>>> but to be >>>>>>>>>>> honest, I don't know what does it mean and how to fix it. >>>>>>>>>>> >>>>>>>>>>> Can anyone help me, please? >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>
