Here's a fix to #1020
https://github.com/spotify/scio/pull/1032

On Sun, Jan 21, 2018 at 4:36 PM Neville Li <[email protected]> wrote:

> Awesome!
> We have't wrapped any stateful processing API in scala but if you have
> working snippet or ideas it'd be great to share in that ticket.
>
> On Sat, Jan 20, 2018 at 4:31 PM Carlos Alonso <[email protected]>
> wrote:
>
>> Thanks Neville!!
>>
>> Your recommendation worked great. Thanks for your help!!
>>
>> As a side note, I found this issue:
>> https://github.com/spotify/scio/issues/448
>>
>> I can share/help there with our experience, as our job, with scio +
>> stateful + timely processing is working fine as of today
>>
>> Regards!!
>>
>> On Fri, Jan 19, 2018 at 6:21 PM Neville Li <[email protected]> wrote:
>>
>>> Welcome.
>>>
>>> Added an issue so we may improve this in the future:
>>> https://github.com/spotify/scio/issues/1020
>>>
>>>
>>> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <[email protected]>
>>> wrote:
>>>
>>>> To build the beam transform I was following this example:
>>>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala
>>>>
>>>> To be honest I don't know how to apply timely and stateful processing
>>>> without using a beam transform or how to rewrite it using the scio built-in
>>>> you suggest. Could you please give me an example?
>>>>
>>>> Thanks for your help!
>>>>
>>>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <[email protected]>
>>>> wrote:
>>>>
>>>>> That happens when you mix beam transforms into scio and defeats the
>>>>> safety we have in place. Map the values into something beam-serializable
>>>>> first or rewrite the transform with a scio built-in which takes care of
>>>>> KvCoder.
>>>>>
>>>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I'm following this example:
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>>>
>>>>>> because I'm building something very similar to a group into batches
>>>>>> functionality. If I don't set the coder manually, this exception arises:
>>>>>> https://pastebin.com/xxdDMXSf
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>>>> KryoAtomicCoder for most Scala types.
>>>>>>> More details:
>>>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>>>
>>>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> May it be because I’m using
>>>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])))
>>>>>>>>  at
>>>>>>>> some point in the pipeline
>>>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>>>> outputs a SerializableCoder)?
>>>>>>>>
>>>>>>>> This is something I've always wondered. How does one specify a
>>>>>>>> coder for a case class?
>>>>>>>>
>>>>>>>> Regards
>>>>>>>>
>>>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an
>>>>>>>>> GH issue with ideally a snippet that can reproduce the problem?
>>>>>>>>>
>>>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi everyone!!
>>>>>>>>>>
>>>>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>>>>> subscription into GCS buckets. In order to do it I'm using both 
>>>>>>>>>> stateful
>>>>>>>>>> and timely processing and after building and testing the project 
>>>>>>>>>> locally I
>>>>>>>>>> tried to run it on Google Dataflow and I started getting those 
>>>>>>>>>> errors.
>>>>>>>>>>
>>>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>>>>
>>>>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class 
>>>>>>>>>> defined as
>>>>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>>>>
>>>>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's 
>>>>>>>>>> Scio as
>>>>>>>>>> well) which may suggest that the issue is on serializing the Map, 
>>>>>>>>>> but to be
>>>>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>>>>
>>>>>>>>>> Can anyone help me, please?
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>

Reply via email to