I'm following this example:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60

because I'm building something very similar to a group into batches
functionality. If I don't set the coder manually, this exception arises:
https://pastebin.com/xxdDMXSf

Thanks!

On Fri, Jan 19, 2018 at 4:35 PM Neville Li <[email protected]> wrote:

> You shouldn't manually set coder in most cases. It defaults to
> KryoAtomicCoder for most Scala types.
> More details:
> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>
> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <[email protected]> wrote:
>
>> May it be because I’m using
>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>> some point in the pipeline
>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>> outputs a SerializableCoder)?
>>
>> This is something I've always wondered. How does one specify a coder for
>> a case class?
>>
>> Regards
>>
>> On Fri, 19 Jan 2018 at 15:51, Neville Li <[email protected]> wrote:
>>
>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>> issue with ideally a snippet that can reproduce the problem?
>>>
>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <[email protected]>
>>> wrote:
>>>
>>>> Hi everyone!!
>>>>
>>>> I'm building a pipeline to store items from a Google PubSub
>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>> and timely processing and after building and testing the project locally I
>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>
>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>
>>>> The item I'm trying to serialize is a KV[String, MessageWithAttributes]
>>>> and MessageWithAttributes is a case class defined as (content: String,
>>>> attrs: Map[String, String])
>>>>
>>>> The underlying clause is java.io.NotSerializableException:
>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>> honest, I don't know what does it mean and how to fix it.
>>>>
>>>> Can anyone help me, please?
>>>> Thanks!
>>>>
>>>

Reply via email to