I've added a comment with a link to our working Stateful and timely
processing solution:
https://github.com/spotify/scio/issues/448#issuecomment-364705100

On Fri, Jan 26, 2018 at 1:43 AM Neville Li <[email protected]> wrote:

> Here's a fix to #1020
> https://github.com/spotify/scio/pull/1032
>
> On Sun, Jan 21, 2018 at 4:36 PM Neville Li <[email protected]> wrote:
>
>> Awesome!
>> We have't wrapped any stateful processing API in scala but if you have
>> working snippet or ideas it'd be great to share in that ticket.
>>
>> On Sat, Jan 20, 2018 at 4:31 PM Carlos Alonso <[email protected]>
>> wrote:
>>
>>> Thanks Neville!!
>>>
>>> Your recommendation worked great. Thanks for your help!!
>>>
>>> As a side note, I found this issue:
>>> https://github.com/spotify/scio/issues/448
>>>
>>> I can share/help there with our experience, as our job, with scio +
>>> stateful + timely processing is working fine as of today
>>>
>>> Regards!!
>>>
>>> On Fri, Jan 19, 2018 at 6:21 PM Neville Li <[email protected]>
>>> wrote:
>>>
>>>> Welcome.
>>>>
>>>> Added an issue so we may improve this in the future:
>>>> https://github.com/spotify/scio/issues/1020
>>>>
>>>>
>>>> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <[email protected]>
>>>> wrote:
>>>>
>>>>> To build the beam transform I was following this example:
>>>>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala
>>>>>
>>>>> To be honest I don't know how to apply timely and stateful processing
>>>>> without using a beam transform or how to rewrite it using the scio 
>>>>> built-in
>>>>> you suggest. Could you please give me an example?
>>>>>
>>>>> Thanks for your help!
>>>>>
>>>>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> That happens when you mix beam transforms into scio and defeats the
>>>>>> safety we have in place. Map the values into something beam-serializable
>>>>>> first or rewrite the transform with a scio built-in which takes care of
>>>>>> KvCoder.
>>>>>>
>>>>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm following this example:
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>>>>
>>>>>>> because I'm building something very similar to a group into batches
>>>>>>> functionality. If I don't set the coder manually, this exception arises:
>>>>>>> https://pastebin.com/xxdDMXSf
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>>>>> KryoAtomicCoder for most Scala types.
>>>>>>>> More details:
>>>>>>>>
>>>>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>>>>
>>>>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> May it be because I’m using
>>>>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])))
>>>>>>>>>  at
>>>>>>>>> some point in the pipeline
>>>>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>>>>> outputs a SerializableCoder)?
>>>>>>>>>
>>>>>>>>> This is something I've always wondered. How does one specify a
>>>>>>>>> coder for a case class?
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>>
>>>>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an
>>>>>>>>>> GH issue with ideally a snippet that can reproduce the problem?
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi everyone!!
>>>>>>>>>>>
>>>>>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>>>>>> subscription into GCS buckets. In order to do it I'm using both 
>>>>>>>>>>> stateful
>>>>>>>>>>> and timely processing and after building and testing the project 
>>>>>>>>>>> locally I
>>>>>>>>>>> tried to run it on Google Dataflow and I started getting those 
>>>>>>>>>>> errors.
>>>>>>>>>>>
>>>>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>>>>>
>>>>>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class 
>>>>>>>>>>> defined as
>>>>>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>>>>>
>>>>>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's 
>>>>>>>>>>> Scio as
>>>>>>>>>>> well) which may suggest that the issue is on serializing the Map, 
>>>>>>>>>>> but to be
>>>>>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>>>>>
>>>>>>>>>>> Can anyone help me, please?
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to