Very nice to know, thank you For what it's worth, my job is now up and running perfectly
Thank you all again On 2 June 2017 at 16:19, Lukasz Cwik <[email protected]> wrote: > If a DoFn needs information from PipelineOptions, it should really get > them from the runtime context (StartBundleContext, FinishBundleContext, > ProcessContext) with getPipelineOptions. > PipelineOptions is specifically designed in this way to prevent users from > relying on their own serialized version and missing out on: > * Value provider / template integration > * Runner provided information (credentials/...) > * Execution environment specific information (logging/host information) > > > > > On Thu, Jun 1, 2017 at 8:37 PM, Gwilym Evans <[email protected] > > wrote: > >> Thanks for both of your assistance >> >> It turned out that a closer examination of the stack trace above revealed >> the true source: >> >> Caused by: java.io.NotSerializableException: org.apache.beam.sdk.options. >> ProxyInvocationHandler >> >> It turns out the "options" class for beam cannot be serialized. >> >> Moving this particular DoFn out to its own class let me inject the needed >> serializable configs rather than passing out the options, and now I'm back >> on track. >> >> >> On 1 June 2017 at 22:48, Gwilym Evans <[email protected]> >> wrote: >> >>> Csabi, I will try that, thank you. >>> >>> Eugene, sorry I should have mentioned that I've already tried that and >>> it still fails. I've also tried annotating it with @JsonIgnore. Thanks, >>> though. >>> >>> On 1 June 2017 at 22:46, Eugene Kirpichov <[email protected]> wrote: >>> >>>> It's probably because of the BigtableSession variable - mark it >>>> transient. >>>> >>>> On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai <[email protected]> >>>> wrote: >>>> >>>>> Hi Gwilym, >>>>> >>>>> try to extract the DoFn into a separate static inner class or into a >>>>> separate file as a top level class, instead of declaring as an >>>>> anonymous inner class. In java the anonymous inner class has an >>>>> implicit reference to the outer enclosing class, and I suspect that the >>>>> serialiser is not able the serialise the fields of this enclosing >>>>> instance. >>>>> >>>>> Regards, >>>>> Csabi >>>>> >>>>> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <[email protected]> >>>>> wrote: >>>>> >>>>>> Here's what I have in my history, if you need the "... X more" >>>>>> expanded I can look into that: >>>>>> >>>>>> 2017-06-01 05:23:05 INFO >>>>>> DataflowPipelineOptions$StagingLocationFactory:127 >>>>>> - No stagingLocation provided, falling back to gcpTempLocation >>>>>> 2017-06-01 05:23:06 INFO DataflowRunner:229 - >>>>>> PipelineOptions.filesToStage was not specified. Defaulting to files from >>>>>> the classpath: will stage 111 files. Enable logging at DEBUG level to see >>>>>> which files will be staged. >>>>>> [WARNING] >>>>>> java.lang.reflect.InvocationTargetException >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>>>> ssorImpl.java:62) >>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>>>> thodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> Caused by: java.lang.IllegalArgumentException: unable to serialize >>>>>> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl >>>>>> er$2@604b2279 >>>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr >>>>>> ray(SerializableUtils.java:53) >>>>>> at org.apache.beam.sdk.util.SerializableUtils.clone(Serializabl >>>>>> eUtils.java:90) >>>>>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(Par >>>>>> Do.java:569) >>>>>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434) >>>>>> at org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl >>>>>> er.main(AthenaPubsubOrderNotificationsHandler.java:138) >>>>>> ... 6 more >>>>>> Caused by: java.io.NotSerializableException: >>>>>> org.apache.beam.sdk.options.ProxyInvocationHandler >>>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j >>>>>> ava:1184) >>>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >>>>>> ream.java:1548) >>>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea >>>>>> m.java:1509) >>>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS >>>>>> tream.java:1432) >>>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j >>>>>> ava:1178) >>>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >>>>>> ream.java:1548) >>>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea >>>>>> m.java:1509) >>>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS >>>>>> tream.java:1432) >>>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j >>>>>> ava:1178) >>>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.ja >>>>>> va:348) >>>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr >>>>>> ray(SerializableUtils.java:49) >>>>>> ... 10 more >>>>>> >>>>>> The way I'm trying to use this in the ParDo/DoFn is: >>>>>> >>>>>> (line 138 starts here) >>>>>> .apply(ParDo.of(new DoFn<PubsubMessage, Map<String, >>>>>> ByteString>>() { >>>>>> private BigtableSession session; >>>>>> >>>>>> @Setup >>>>>> public void setUp() throws IOException { >>>>>> BigtableOptions opts = new >>>>>> BigtableOptions.Builder() >>>>>> .setProjectId(options.getSourc >>>>>> eProjectId().get()) >>>>>> .setInstanceId(options.getSour >>>>>> ceInstanceId().get()) >>>>>> .build(); >>>>>> >>>>>> session = new BigtableSession(opts); >>>>>> } >>>>>> ... >>>>>> >>>>>> With a ReadRowsRequest built up during the processElement and >>>>>> executed like: >>>>>> >>>>>> ResultScanner<FlatRow> results = >>>>>> session.getDataClient().readFlatRows(request.build()); >>>>>> >>>>>> Thanks, >>>>>> Gwilym >>>>>> >>>>>> >>>>>> On 1 June 2017 at 15:10, Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> Combining PubSub + Bigtable is common. >>>>>>> >>>>>>> You should try to use the BigtableSession approach because the hbase >>>>>>> approach adds a lot of dependencies (leading to dependency conflicts). >>>>>>> You should use the same version of Bigtable libraries that Apache >>>>>>> Beam is using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2). >>>>>>> >>>>>>> Can you provide the full stack trace for the exception your seeing? >>>>>>> >>>>>>> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi list, >>>>>>>> >>>>>>>> I'm trying to figure out if Beam is intended to do the following >>>>>>>> and, if so, what's the best approach? >>>>>>>> >>>>>>>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively >>>>>>>> new to Java, so if there's any known solution for this a code example >>>>>>>> would >>>>>>>> be greatly appreciated. >>>>>>>> >>>>>>>> I have an unbound stream of short messages (coming from PubSub). >>>>>>>> >>>>>>>> For each message, I want to get a number of rows from an external >>>>>>>> database (rows within Bigtable, always the same table) based on the >>>>>>>> contents of the message, and use the rows when producing output for the >>>>>>>> final write apply. >>>>>>>> >>>>>>>> I've tried various means of connecting out to Bigtable from within >>>>>>>> the DoFn which is handling the PubSub inputs, but so far everything >>>>>>>> I've >>>>>>>> tried has resulted in Beam refusing to run the job due to: >>>>>>>> >>>>>>>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr >>>>>>>> oxyInvocationHandler >>>>>>>> >>>>>>>> (methods I've tried: manually using a BigtableSession, manually >>>>>>>> using the Bigtable HBase libs) >>>>>>>> >>>>>>>> So is this something that Beam was designed to do? If so, what's >>>>>>>> the recommended approach? >>>>>>>> >>>>>>>> I considered dynamically constructing a PCollection, but I wasn't >>>>>>>> sure if that would make use of connection pooling to Bigtable. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Gwilym >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>> >> >
