Hi Gwilym,

try to extract the DoFn into a separate static inner class or into a
separate file as a top level class, instead of declaring as an
anonymous inner class. In java the anonymous inner class has an implicit
reference to the outer enclosing class, and I suspect that the serialiser
is not able the serialise the fields of this enclosing instance.

Regards,
Csabi

On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <[email protected]>
wrote:

> Here's what I have in my history, if you need the "... X more" expanded I
> can look into that:
>
> 2017-06-01 05:23:05 INFO
>  DataflowPipelineOptions$StagingLocationFactory:127 - No stagingLocation
> provided, falling back to gcpTempLocation
> 2017-06-01 05:23:06 INFO  DataflowRunner:229 -
> PipelineOptions.filesToStage was not specified. Defaulting to files from
> the classpath: will stage 111 files. Enable logging at DEBUG level to see
> which files will be staged.
> [WARNING]
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalArgumentException: unable to serialize
> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler$2@604b2279
> at
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
> at
> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
> at
> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler.main(AthenaPubsubOrderNotificationsHandler.java:138)
> ... 6 more
> Caused by: java.io.NotSerializableException:
> org.apache.beam.sdk.options.ProxyInvocationHandler
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
> ... 10 more
>
> The way I'm trying to use this in the ParDo/DoFn is:
>
> (line 138 starts here)
>                 .apply(ParDo.of(new DoFn<PubsubMessage, Map<String,
> ByteString>>() {
>                     private BigtableSession session;
>
>                     @Setup
>                     public void setUp() throws IOException {
>                         BigtableOptions opts = new
> BigtableOptions.Builder()
>
> .setProjectId(options.getSourceProjectId().get())
>
> .setInstanceId(options.getSourceInstanceId().get())
>                                 .build();
>
>                         session = new BigtableSession(opts);
>                     }
>                     ...
>
> With a ReadRowsRequest built up during the processElement and executed
> like:
>
>                     ResultScanner<FlatRow> results =
> session.getDataClient().readFlatRows(request.build());
>
> Thanks,
> Gwilym
>
>
> On 1 June 2017 at 15:10, Lukasz Cwik <[email protected]> wrote:
>
>> Combining PubSub + Bigtable is common.
>>
>> You should try to use the BigtableSession approach because the hbase
>> approach adds a lot of dependencies (leading to dependency conflicts).
>> You should use the same version of Bigtable libraries that Apache Beam is
>> using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2).
>>
>> Can you provide the full stack trace for the exception your seeing?
>>
>> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans <
>> [email protected]> wrote:
>>
>>> Hi list,
>>>
>>> I'm trying to figure out if Beam is intended to do the following and, if
>>> so, what's the best approach?
>>>
>>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively new to
>>> Java, so if there's any known solution for this a code example would be
>>> greatly appreciated.
>>>
>>> I have an unbound stream of short messages (coming from PubSub).
>>>
>>> For each message, I want to get a number of rows from an external
>>> database (rows within Bigtable, always the same table) based on the
>>> contents of the message, and use the rows when producing output for the
>>> final write apply.
>>>
>>> I've tried various means of connecting out to Bigtable from within
>>> the DoFn which is handling the PubSub inputs, but so far everything I've
>>> tried has resulted in Beam refusing to run the job due to:
>>>
>>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr
>>> oxyInvocationHandler
>>>
>>> (methods I've tried: manually using a BigtableSession, manually using
>>> the Bigtable HBase libs)
>>>
>>> So is this something that Beam was designed to do? If so, what's the
>>> recommended approach?
>>>
>>> I considered dynamically constructing a PCollection, but I wasn't sure
>>> if that would make use of connection pooling to Bigtable.
>>>
>>> Thanks,
>>> Gwilym
>>>
>>>
>>
>

Reply via email to