Csabi, I will try that, thank you.

Eugene, sorry I should have mentioned that I've already tried that and it
still fails. I've also tried annotating it with @JsonIgnore. Thanks, though.

On 1 June 2017 at 22:46, Eugene Kirpichov <[email protected]> wrote:

> It's probably because of the BigtableSession variable - mark it transient.
>
> On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai <[email protected]>
> wrote:
>
>> Hi Gwilym,
>>
>> try to extract the DoFn into a separate static inner class or into a
>> separate file as a top level class, instead of declaring as an
>> anonymous inner class. In java the anonymous inner class has an implicit
>> reference to the outer enclosing class, and I suspect that the serialiser
>> is not able the serialise the fields of this enclosing instance.
>>
>> Regards,
>> Csabi
>>
>> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <[email protected]>
>> wrote:
>>
>>> Here's what I have in my history, if you need the "... X more" expanded
>>> I can look into that:
>>>
>>> 2017-06-01 05:23:05 INFO  DataflowPipelineOptions$StagingLocationFactory:127
>>> - No stagingLocation provided, falling back to gcpTempLocation
>>> 2017-06-01 05:23:06 INFO  DataflowRunner:229 -
>>> PipelineOptions.filesToStage was not specified. Defaulting to files from
>>> the classpath: will stage 111 files. Enable logging at DEBUG level to see
>>> which files will be staged.
>>> [WARNING]
>>> java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>> NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.IllegalArgumentException: unable to serialize
>>> org.apache.beam.examples.AthenaPubsubOrderNotifications
>>> Handler$2@604b2279
>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(
>>> SerializableUtils.java:53)
>>> at org.apache.beam.sdk.util.SerializableUtils.clone(
>>> SerializableUtils.java:90)
>>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.
>>> <init>(ParDo.java:569)
>>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
>>> at org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler.main(
>>> AthenaPubsubOrderNotificationsHandler.java:138)
>>> ... 6 more
>>> Caused by: java.io.NotSerializableException:
>>> org.apache.beam.sdk.options.ProxyInvocationHandler
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>> at java.io.ObjectOutputStream.defaultWriteFields(
>>> ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(
>>> ObjectOutputStream.java:1509)
>>> at java.io.ObjectOutputStream.writeOrdinaryObject(
>>> ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at java.io.ObjectOutputStream.defaultWriteFields(
>>> ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(
>>> ObjectOutputStream.java:1509)
>>> at java.io.ObjectOutputStream.writeOrdinaryObject(
>>> ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(
>>> SerializableUtils.java:49)
>>> ... 10 more
>>>
>>> The way I'm trying to use this in the ParDo/DoFn is:
>>>
>>> (line 138 starts here)
>>>                 .apply(ParDo.of(new DoFn<PubsubMessage, Map<String,
>>> ByteString>>() {
>>>                     private BigtableSession session;
>>>
>>>                     @Setup
>>>                     public void setUp() throws IOException {
>>>                         BigtableOptions opts = new
>>> BigtableOptions.Builder()
>>>                                 .setProjectId(options.
>>> getSourceProjectId().get())
>>>                                 .setInstanceId(options.
>>> getSourceInstanceId().get())
>>>                                 .build();
>>>
>>>                         session = new BigtableSession(opts);
>>>                     }
>>>                     ...
>>>
>>> With a ReadRowsRequest built up during the processElement and executed
>>> like:
>>>
>>>                     ResultScanner<FlatRow> results =
>>> session.getDataClient().readFlatRows(request.build());
>>>
>>> Thanks,
>>> Gwilym
>>>
>>>
>>> On 1 June 2017 at 15:10, Lukasz Cwik <[email protected]> wrote:
>>>
>>>> Combining PubSub + Bigtable is common.
>>>>
>>>> You should try to use the BigtableSession approach because the hbase
>>>> approach adds a lot of dependencies (leading to dependency conflicts).
>>>> You should use the same version of Bigtable libraries that Apache Beam
>>>> is using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2).
>>>>
>>>> Can you provide the full stack trace for the exception your seeing?
>>>>
>>>> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi list,
>>>>>
>>>>> I'm trying to figure out if Beam is intended to do the following and,
>>>>> if so, what's the best approach?
>>>>>
>>>>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively new
>>>>> to Java, so if there's any known solution for this a code example would be
>>>>> greatly appreciated.
>>>>>
>>>>> I have an unbound stream of short messages (coming from PubSub).
>>>>>
>>>>> For each message, I want to get a number of rows from an external
>>>>> database (rows within Bigtable, always the same table) based on the
>>>>> contents of the message, and use the rows when producing output for the
>>>>> final write apply.
>>>>>
>>>>> I've tried various means of connecting out to Bigtable from within
>>>>> the DoFn which is handling the PubSub inputs, but so far everything I've
>>>>> tried has resulted in Beam refusing to run the job due to:
>>>>>
>>>>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr
>>>>> oxyInvocationHandler
>>>>>
>>>>> (methods I've tried: manually using a BigtableSession, manually using
>>>>> the Bigtable HBase libs)
>>>>>
>>>>> So is this something that Beam was designed to do? If so, what's the
>>>>> recommended approach?
>>>>>
>>>>> I considered dynamically constructing a PCollection, but I wasn't sure
>>>>> if that would make use of connection pooling to Bigtable.
>>>>>
>>>>> Thanks,
>>>>> Gwilym
>>>>>
>>>>>
>>>>
>>>

Reply via email to