If a DoFn needs information from PipelineOptions, it should really get them from the runtime context (StartBundleContext, FinishBundleContext, ProcessContext) with getPipelineOptions. PipelineOptions is specifically designed in this way to prevent users from relying on their own serialized version and missing out on: * Value provider / template integration * Runner provided information (credentials/...) * Execution environment specific information (logging/host information)
On Thu, Jun 1, 2017 at 8:37 PM, Gwilym Evans <[email protected]> wrote: > Thanks for both of your assistance > > It turned out that a closer examination of the stack trace above revealed > the true source: > > Caused by: java.io.NotSerializableException: org.apache.beam.sdk.options. > ProxyInvocationHandler > > It turns out the "options" class for beam cannot be serialized. > > Moving this particular DoFn out to its own class let me inject the needed > serializable configs rather than passing out the options, and now I'm back > on track. > > > On 1 June 2017 at 22:48, Gwilym Evans <[email protected]> > wrote: > >> Csabi, I will try that, thank you. >> >> Eugene, sorry I should have mentioned that I've already tried that and it >> still fails. I've also tried annotating it with @JsonIgnore. Thanks, though. >> >> On 1 June 2017 at 22:46, Eugene Kirpichov <[email protected]> wrote: >> >>> It's probably because of the BigtableSession variable - mark it >>> transient. >>> >>> On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai <[email protected]> >>> wrote: >>> >>>> Hi Gwilym, >>>> >>>> try to extract the DoFn into a separate static inner class or into a >>>> separate file as a top level class, instead of declaring as an >>>> anonymous inner class. In java the anonymous inner class has an >>>> implicit reference to the outer enclosing class, and I suspect that the >>>> serialiser is not able the serialise the fields of this enclosing instance. >>>> >>>> Regards, >>>> Csabi >>>> >>>> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <[email protected]> >>>> wrote: >>>> >>>>> Here's what I have in my history, if you need the "... X more" >>>>> expanded I can look into that: >>>>> >>>>> 2017-06-01 05:23:05 INFO >>>>> DataflowPipelineOptions$StagingLocationFactory:127 >>>>> - No stagingLocation provided, falling back to gcpTempLocation >>>>> 2017-06-01 05:23:06 INFO DataflowRunner:229 - >>>>> PipelineOptions.filesToStage was not specified. Defaulting to files from >>>>> the classpath: will stage 111 files. Enable logging at DEBUG level to see >>>>> which files will be staged. >>>>> [WARNING] >>>>> java.lang.reflect.InvocationTargetException >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>>> ssorImpl.java:62) >>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>>> thodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.IllegalArgumentException: unable to serialize >>>>> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl >>>>> er$2@604b2279 >>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr >>>>> ray(SerializableUtils.java:53) >>>>> at org.apache.beam.sdk.util.SerializableUtils.clone(Serializabl >>>>> eUtils.java:90) >>>>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(Par >>>>> Do.java:569) >>>>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434) >>>>> at org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl >>>>> er.main(AthenaPubsubOrderNotificationsHandler.java:138) >>>>> ... 6 more >>>>> Caused by: java.io.NotSerializableException: >>>>> org.apache.beam.sdk.options.ProxyInvocationHandler >>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j >>>>> ava:1184) >>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >>>>> ream.java:1548) >>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea >>>>> m.java:1509) >>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS >>>>> tream.java:1432) >>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j >>>>> ava:1178) >>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >>>>> ream.java:1548) >>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea >>>>> m.java:1509) >>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS >>>>> tream.java:1432) >>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j >>>>> ava:1178) >>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr >>>>> ray(SerializableUtils.java:49) >>>>> ... 10 more >>>>> >>>>> The way I'm trying to use this in the ParDo/DoFn is: >>>>> >>>>> (line 138 starts here) >>>>> .apply(ParDo.of(new DoFn<PubsubMessage, Map<String, >>>>> ByteString>>() { >>>>> private BigtableSession session; >>>>> >>>>> @Setup >>>>> public void setUp() throws IOException { >>>>> BigtableOptions opts = new >>>>> BigtableOptions.Builder() >>>>> .setProjectId(options.getSourc >>>>> eProjectId().get()) >>>>> .setInstanceId(options.getSour >>>>> ceInstanceId().get()) >>>>> .build(); >>>>> >>>>> session = new BigtableSession(opts); >>>>> } >>>>> ... >>>>> >>>>> With a ReadRowsRequest built up during the processElement and executed >>>>> like: >>>>> >>>>> ResultScanner<FlatRow> results = >>>>> session.getDataClient().readFlatRows(request.build()); >>>>> >>>>> Thanks, >>>>> Gwilym >>>>> >>>>> >>>>> On 1 June 2017 at 15:10, Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> Combining PubSub + Bigtable is common. >>>>>> >>>>>> You should try to use the BigtableSession approach because the hbase >>>>>> approach adds a lot of dependencies (leading to dependency conflicts). >>>>>> You should use the same version of Bigtable libraries that Apache >>>>>> Beam is using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2). >>>>>> >>>>>> Can you provide the full stack trace for the exception your seeing? >>>>>> >>>>>> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi list, >>>>>>> >>>>>>> I'm trying to figure out if Beam is intended to do the following >>>>>>> and, if so, what's the best approach? >>>>>>> >>>>>>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively new >>>>>>> to Java, so if there's any known solution for this a code example would >>>>>>> be >>>>>>> greatly appreciated. >>>>>>> >>>>>>> I have an unbound stream of short messages (coming from PubSub). >>>>>>> >>>>>>> For each message, I want to get a number of rows from an external >>>>>>> database (rows within Bigtable, always the same table) based on the >>>>>>> contents of the message, and use the rows when producing output for the >>>>>>> final write apply. >>>>>>> >>>>>>> I've tried various means of connecting out to Bigtable from within >>>>>>> the DoFn which is handling the PubSub inputs, but so far everything I've >>>>>>> tried has resulted in Beam refusing to run the job due to: >>>>>>> >>>>>>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr >>>>>>> oxyInvocationHandler >>>>>>> >>>>>>> (methods I've tried: manually using a BigtableSession, manually >>>>>>> using the Bigtable HBase libs) >>>>>>> >>>>>>> So is this something that Beam was designed to do? If so, what's the >>>>>>> recommended approach? >>>>>>> >>>>>>> I considered dynamically constructing a PCollection, but I wasn't >>>>>>> sure if that would make use of connection pooling to Bigtable. >>>>>>> >>>>>>> Thanks, >>>>>>> Gwilym >>>>>>> >>>>>>> >>>>>> >>>>> >> >
