It's probably because of the BigtableSession variable - mark it transient.

On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai <[email protected]>
wrote:

> Hi Gwilym,
>
> try to extract the DoFn into a separate static inner class or into a
> separate file as a top level class, instead of declaring as an
> anonymous inner class. In java the anonymous inner class has an implicit
> reference to the outer enclosing class, and I suspect that the serialiser
> is not able the serialise the fields of this enclosing instance.
>
> Regards,
> Csabi
>
> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <[email protected]>
> wrote:
>
>> Here's what I have in my history, if you need the "... X more" expanded I
>> can look into that:
>>
>> 2017-06-01 05:23:05 INFO
>>  DataflowPipelineOptions$StagingLocationFactory:127 - No stagingLocation
>> provided, falling back to gcpTempLocation
>> 2017-06-01 05:23:06 INFO  DataflowRunner:229 -
>> PipelineOptions.filesToStage was not specified. Defaulting to files from
>> the classpath: will stage 111 files. Enable logging at DEBUG level to see
>> which files will be staged.
>> [WARNING]
>> java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.IllegalArgumentException: unable to serialize
>> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler$2@604b2279
>> at
>> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
>> at
>> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
>> at
>> org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
>> at
>> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler.main(AthenaPubsubOrderNotificationsHandler.java:138)
>> ... 6 more
>> Caused by: java.io.NotSerializableException:
>> org.apache.beam.sdk.options.ProxyInvocationHandler
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
>> ... 10 more
>>
>> The way I'm trying to use this in the ParDo/DoFn is:
>>
>> (line 138 starts here)
>>                 .apply(ParDo.of(new DoFn<PubsubMessage, Map<String,
>> ByteString>>() {
>>                     private BigtableSession session;
>>
>>                     @Setup
>>                     public void setUp() throws IOException {
>>                         BigtableOptions opts = new
>> BigtableOptions.Builder()
>>
>> .setProjectId(options.getSourceProjectId().get())
>>
>> .setInstanceId(options.getSourceInstanceId().get())
>>                                 .build();
>>
>>                         session = new BigtableSession(opts);
>>                     }
>>                     ...
>>
>> With a ReadRowsRequest built up during the processElement and executed
>> like:
>>
>>                     ResultScanner<FlatRow> results =
>> session.getDataClient().readFlatRows(request.build());
>>
>> Thanks,
>> Gwilym
>>
>>
>> On 1 June 2017 at 15:10, Lukasz Cwik <[email protected]> wrote:
>>
>>> Combining PubSub + Bigtable is common.
>>>
>>> You should try to use the BigtableSession approach because the hbase
>>> approach adds a lot of dependencies (leading to dependency conflicts).
>>> You should use the same version of Bigtable libraries that Apache Beam
>>> is using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2).
>>>
>>> Can you provide the full stack trace for the exception your seeing?
>>>
>>> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans <
>>> [email protected]> wrote:
>>>
>>>> Hi list,
>>>>
>>>> I'm trying to figure out if Beam is intended to do the following and,
>>>> if so, what's the best approach?
>>>>
>>>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively new to
>>>> Java, so if there's any known solution for this a code example would be
>>>> greatly appreciated.
>>>>
>>>> I have an unbound stream of short messages (coming from PubSub).
>>>>
>>>> For each message, I want to get a number of rows from an external
>>>> database (rows within Bigtable, always the same table) based on the
>>>> contents of the message, and use the rows when producing output for the
>>>> final write apply.
>>>>
>>>> I've tried various means of connecting out to Bigtable from within
>>>> the DoFn which is handling the PubSub inputs, but so far everything I've
>>>> tried has resulted in Beam refusing to run the job due to:
>>>>
>>>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr
>>>> oxyInvocationHandler
>>>>
>>>> (methods I've tried: manually using a BigtableSession, manually using
>>>> the Bigtable HBase libs)
>>>>
>>>> So is this something that Beam was designed to do? If so, what's the
>>>> recommended approach?
>>>>
>>>> I considered dynamically constructing a PCollection, but I wasn't sure
>>>> if that would make use of connection pooling to Bigtable.
>>>>
>>>> Thanks,
>>>> Gwilym
>>>>
>>>>
>>>
>>

Reply via email to