To build the beam transform I was following this example:
https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala

To be honest I don't know how to apply timely and stateful processing
without using a beam transform or how to rewrite it using the scio built-in
you suggest. Could you please give me an example?

Thanks for your help!

On Fri, Jan 19, 2018 at 5:04 PM Neville Li <[email protected]> wrote:

> That happens when you mix beam transforms into scio and defeats the safety
> we have in place. Map the values into something beam-serializable first or
> rewrite the transform with a scio built-in which takes care of KvCoder.
>
> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <[email protected]> wrote:
>
>> I'm following this example:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>
>> because I'm building something very similar to a group into batches
>> functionality. If I don't set the coder manually, this exception arises:
>> https://pastebin.com/xxdDMXSf
>>
>> Thanks!
>>
>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <[email protected]> wrote:
>>
>>> You shouldn't manually set coder in most cases. It defaults to
>>> KryoAtomicCoder for most Scala types.
>>> More details:
>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>
>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <[email protected]>
>>> wrote:
>>>
>>>> May it be because I’m using
>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>> some point in the pipeline
>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>> outputs a SerializableCoder)?
>>>>
>>>> This is something I've always wondered. How does one specify a coder
>>>> for a case class?
>>>>
>>>> Regards
>>>>
>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <[email protected]> wrote:
>>>>
>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>
>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi everyone!!
>>>>>>
>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>> and timely processing and after building and testing the project locally 
>>>>>> I
>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>
>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>
>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined 
>>>>>> as
>>>>>> (content: String, attrs: Map[String, String])
>>>>>>
>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio 
>>>>>> as
>>>>>> well) which may suggest that the issue is on serializing the Map, but to 
>>>>>> be
>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>
>>>>>> Can anyone help me, please?
>>>>>> Thanks!
>>>>>>
>>>>>

Reply via email to