Csabi, I will try that, thank you. Eugene, sorry I should have mentioned that I've already tried that and it still fails. I've also tried annotating it with @JsonIgnore. Thanks, though.
On 1 June 2017 at 22:46, Eugene Kirpichov <[email protected]> wrote: > It's probably because of the BigtableSession variable - mark it transient. > > On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai <[email protected]> > wrote: > >> Hi Gwilym, >> >> try to extract the DoFn into a separate static inner class or into a >> separate file as a top level class, instead of declaring as an >> anonymous inner class. In java the anonymous inner class has an implicit >> reference to the outer enclosing class, and I suspect that the serialiser >> is not able the serialise the fields of this enclosing instance. >> >> Regards, >> Csabi >> >> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <[email protected]> >> wrote: >> >>> Here's what I have in my history, if you need the "... X more" expanded >>> I can look into that: >>> >>> 2017-06-01 05:23:05 INFO DataflowPipelineOptions$StagingLocationFactory:127 >>> - No stagingLocation provided, falling back to gcpTempLocation >>> 2017-06-01 05:23:06 INFO DataflowRunner:229 - >>> PipelineOptions.filesToStage was not specified. Defaulting to files from >>> the classpath: will stage 111 files. Enable logging at DEBUG level to see >>> which files will be staged. >>> [WARNING] >>> java.lang.reflect.InvocationTargetException >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke( >>> NativeMethodAccessorImpl.java:62) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >>> DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.IllegalArgumentException: unable to serialize >>> org.apache.beam.examples.AthenaPubsubOrderNotifications >>> Handler$2@604b2279 >>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray( >>> SerializableUtils.java:53) >>> at org.apache.beam.sdk.util.SerializableUtils.clone( >>> SerializableUtils.java:90) >>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput. >>> <init>(ParDo.java:569) >>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434) >>> at org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler.main( >>> AthenaPubsubOrderNotificationsHandler.java:138) >>> ... 6 more >>> Caused by: java.io.NotSerializableException: >>> org.apache.beam.sdk.options.ProxyInvocationHandler >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>> at java.io.ObjectOutputStream.defaultWriteFields( >>> ObjectOutputStream.java:1548) >>> at java.io.ObjectOutputStream.writeSerialData( >>> ObjectOutputStream.java:1509) >>> at java.io.ObjectOutputStream.writeOrdinaryObject( >>> ObjectOutputStream.java:1432) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at java.io.ObjectOutputStream.defaultWriteFields( >>> ObjectOutputStream.java:1548) >>> at java.io.ObjectOutputStream.writeSerialData( >>> ObjectOutputStream.java:1509) >>> at java.io.ObjectOutputStream.writeOrdinaryObject( >>> ObjectOutputStream.java:1432) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray( >>> SerializableUtils.java:49) >>> ... 10 more >>> >>> The way I'm trying to use this in the ParDo/DoFn is: >>> >>> (line 138 starts here) >>> .apply(ParDo.of(new DoFn<PubsubMessage, Map<String, >>> ByteString>>() { >>> private BigtableSession session; >>> >>> @Setup >>> public void setUp() throws IOException { >>> BigtableOptions opts = new >>> BigtableOptions.Builder() >>> .setProjectId(options. >>> getSourceProjectId().get()) >>> .setInstanceId(options. >>> getSourceInstanceId().get()) >>> .build(); >>> >>> session = new BigtableSession(opts); >>> } >>> ... >>> >>> With a ReadRowsRequest built up during the processElement and executed >>> like: >>> >>> ResultScanner<FlatRow> results = >>> session.getDataClient().readFlatRows(request.build()); >>> >>> Thanks, >>> Gwilym >>> >>> >>> On 1 June 2017 at 15:10, Lukasz Cwik <[email protected]> wrote: >>> >>>> Combining PubSub + Bigtable is common. >>>> >>>> You should try to use the BigtableSession approach because the hbase >>>> approach adds a lot of dependencies (leading to dependency conflicts). >>>> You should use the same version of Bigtable libraries that Apache Beam >>>> is using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2). >>>> >>>> Can you provide the full stack trace for the exception your seeing? >>>> >>>> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans < >>>> [email protected]> wrote: >>>> >>>>> Hi list, >>>>> >>>>> I'm trying to figure out if Beam is intended to do the following and, >>>>> if so, what's the best approach? >>>>> >>>>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively new >>>>> to Java, so if there's any known solution for this a code example would be >>>>> greatly appreciated. >>>>> >>>>> I have an unbound stream of short messages (coming from PubSub). >>>>> >>>>> For each message, I want to get a number of rows from an external >>>>> database (rows within Bigtable, always the same table) based on the >>>>> contents of the message, and use the rows when producing output for the >>>>> final write apply. >>>>> >>>>> I've tried various means of connecting out to Bigtable from within >>>>> the DoFn which is handling the PubSub inputs, but so far everything I've >>>>> tried has resulted in Beam refusing to run the job due to: >>>>> >>>>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr >>>>> oxyInvocationHandler >>>>> >>>>> (methods I've tried: manually using a BigtableSession, manually using >>>>> the Bigtable HBase libs) >>>>> >>>>> So is this something that Beam was designed to do? If so, what's the >>>>> recommended approach? >>>>> >>>>> I considered dynamically constructing a PCollection, but I wasn't sure >>>>> if that would make use of connection pooling to Bigtable. >>>>> >>>>> Thanks, >>>>> Gwilym >>>>> >>>>> >>>> >>>
