Hi Gwilym, try to extract the DoFn into a separate static inner class or into a separate file as a top level class, instead of declaring as an anonymous inner class. In java the anonymous inner class has an implicit reference to the outer enclosing class, and I suspect that the serialiser is not able the serialise the fields of this enclosing instance.
Regards, Csabi On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <[email protected]> wrote: > Here's what I have in my history, if you need the "... X more" expanded I > can look into that: > > 2017-06-01 05:23:05 INFO > DataflowPipelineOptions$StagingLocationFactory:127 - No stagingLocation > provided, falling back to gcpTempLocation > 2017-06-01 05:23:06 INFO DataflowRunner:229 - > PipelineOptions.filesToStage was not specified. Defaulting to files from > the classpath: will stage 111 files. Enable logging at DEBUG level to see > which files will be staged. > [WARNING] > java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalArgumentException: unable to serialize > org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler$2@604b2279 > at > org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90) > at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569) > at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434) > at > org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler.main(AthenaPubsubOrderNotificationsHandler.java:138) > ... 6 more > Caused by: java.io.NotSerializableException: > org.apache.beam.sdk.options.ProxyInvocationHandler > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49) > ... 10 more > > The way I'm trying to use this in the ParDo/DoFn is: > > (line 138 starts here) > .apply(ParDo.of(new DoFn<PubsubMessage, Map<String, > ByteString>>() { > private BigtableSession session; > > @Setup > public void setUp() throws IOException { > BigtableOptions opts = new > BigtableOptions.Builder() > > .setProjectId(options.getSourceProjectId().get()) > > .setInstanceId(options.getSourceInstanceId().get()) > .build(); > > session = new BigtableSession(opts); > } > ... > > With a ReadRowsRequest built up during the processElement and executed > like: > > ResultScanner<FlatRow> results = > session.getDataClient().readFlatRows(request.build()); > > Thanks, > Gwilym > > > On 1 June 2017 at 15:10, Lukasz Cwik <[email protected]> wrote: > >> Combining PubSub + Bigtable is common. >> >> You should try to use the BigtableSession approach because the hbase >> approach adds a lot of dependencies (leading to dependency conflicts). >> You should use the same version of Bigtable libraries that Apache Beam is >> using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2). >> >> Can you provide the full stack trace for the exception your seeing? >> >> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans < >> [email protected]> wrote: >> >>> Hi list, >>> >>> I'm trying to figure out if Beam is intended to do the following and, if >>> so, what's the best approach? >>> >>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively new to >>> Java, so if there's any known solution for this a code example would be >>> greatly appreciated. >>> >>> I have an unbound stream of short messages (coming from PubSub). >>> >>> For each message, I want to get a number of rows from an external >>> database (rows within Bigtable, always the same table) based on the >>> contents of the message, and use the rows when producing output for the >>> final write apply. >>> >>> I've tried various means of connecting out to Bigtable from within >>> the DoFn which is handling the PubSub inputs, but so far everything I've >>> tried has resulted in Beam refusing to run the job due to: >>> >>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr >>> oxyInvocationHandler >>> >>> (methods I've tried: manually using a BigtableSession, manually using >>> the Bigtable HBase libs) >>> >>> So is this something that Beam was designed to do? If so, what's the >>> recommended approach? >>> >>> I considered dynamically constructing a PCollection, but I wasn't sure >>> if that would make use of connection pooling to Bigtable. >>> >>> Thanks, >>> Gwilym >>> >>> >> >
