It's probably because of the BigtableSession variable - mark it transient. On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai <[email protected]> wrote:
> Hi Gwilym, > > try to extract the DoFn into a separate static inner class or into a > separate file as a top level class, instead of declaring as an > anonymous inner class. In java the anonymous inner class has an implicit > reference to the outer enclosing class, and I suspect that the serialiser > is not able the serialise the fields of this enclosing instance. > > Regards, > Csabi > > On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <[email protected]> > wrote: > >> Here's what I have in my history, if you need the "... X more" expanded I >> can look into that: >> >> 2017-06-01 05:23:05 INFO >> DataflowPipelineOptions$StagingLocationFactory:127 - No stagingLocation >> provided, falling back to gcpTempLocation >> 2017-06-01 05:23:06 INFO DataflowRunner:229 - >> PipelineOptions.filesToStage was not specified. Defaulting to files from >> the classpath: will stage 111 files. Enable logging at DEBUG level to see >> which files will be staged. >> [WARNING] >> java.lang.reflect.InvocationTargetException >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.IllegalArgumentException: unable to serialize >> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler$2@604b2279 >> at >> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) >> at >> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90) >> at >> org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569) >> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434) >> at >> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler.main(AthenaPubsubOrderNotificationsHandler.java:138) >> ... 6 more >> Caused by: java.io.NotSerializableException: >> org.apache.beam.sdk.options.ProxyInvocationHandler >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at >> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49) >> ... 10 more >> >> The way I'm trying to use this in the ParDo/DoFn is: >> >> (line 138 starts here) >> .apply(ParDo.of(new DoFn<PubsubMessage, Map<String, >> ByteString>>() { >> private BigtableSession session; >> >> @Setup >> public void setUp() throws IOException { >> BigtableOptions opts = new >> BigtableOptions.Builder() >> >> .setProjectId(options.getSourceProjectId().get()) >> >> .setInstanceId(options.getSourceInstanceId().get()) >> .build(); >> >> session = new BigtableSession(opts); >> } >> ... >> >> With a ReadRowsRequest built up during the processElement and executed >> like: >> >> ResultScanner<FlatRow> results = >> session.getDataClient().readFlatRows(request.build()); >> >> Thanks, >> Gwilym >> >> >> On 1 June 2017 at 15:10, Lukasz Cwik <[email protected]> wrote: >> >>> Combining PubSub + Bigtable is common. >>> >>> You should try to use the BigtableSession approach because the hbase >>> approach adds a lot of dependencies (leading to dependency conflicts). >>> You should use the same version of Bigtable libraries that Apache Beam >>> is using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2). >>> >>> Can you provide the full stack trace for the exception your seeing? >>> >>> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans < >>> [email protected]> wrote: >>> >>>> Hi list, >>>> >>>> I'm trying to figure out if Beam is intended to do the following and, >>>> if so, what's the best approach? >>>> >>>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively new to >>>> Java, so if there's any known solution for this a code example would be >>>> greatly appreciated. >>>> >>>> I have an unbound stream of short messages (coming from PubSub). >>>> >>>> For each message, I want to get a number of rows from an external >>>> database (rows within Bigtable, always the same table) based on the >>>> contents of the message, and use the rows when producing output for the >>>> final write apply. >>>> >>>> I've tried various means of connecting out to Bigtable from within >>>> the DoFn which is handling the PubSub inputs, but so far everything I've >>>> tried has resulted in Beam refusing to run the job due to: >>>> >>>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr >>>> oxyInvocationHandler >>>> >>>> (methods I've tried: manually using a BigtableSession, manually using >>>> the Bigtable HBase libs) >>>> >>>> So is this something that Beam was designed to do? If so, what's the >>>> recommended approach? >>>> >>>> I considered dynamically constructing a PCollection, but I wasn't sure >>>> if that would make use of connection pooling to Bigtable. >>>> >>>> Thanks, >>>> Gwilym >>>> >>>> >>> >>
