Here's what I have in my history, if you need the "... X more" expanded I
can look into that:
2017-06-01 05:23:05 INFO
DataflowPipelineOptions$StagingLocationFactory:127 - No stagingLocation
provided, falling back to gcpTempLocation
2017-06-01 05:23:06 INFO DataflowRunner:229 - PipelineOptions.filesToStage
was not specified. Defaulting to files from the classpath: will stage 111
files. Enable logging at DEBUG level to see which files will be staged.
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: unable to serialize
org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler$2@604b2279
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at
org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
at
org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler.main(AthenaPubsubOrderNotificationsHandler.java:138)
... 6 more
Caused by: java.io.NotSerializableException:
org.apache.beam.sdk.options.ProxyInvocationHandler
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 10 more
The way I'm trying to use this in the ParDo/DoFn is:
(line 138 starts here)
.apply(ParDo.of(new DoFn<PubsubMessage, Map<String,
ByteString>>() {
private BigtableSession session;
@Setup
public void setUp() throws IOException {
BigtableOptions opts = new BigtableOptions.Builder()
.setProjectId(options.getSourceProjectId().get())
.setInstanceId(options.getSourceInstanceId().get())
.build();
session = new BigtableSession(opts);
}
...
With a ReadRowsRequest built up during the processElement and executed like:
ResultScanner<FlatRow> results =
session.getDataClient().readFlatRows(request.build());
Thanks,
Gwilym
On 1 June 2017 at 15:10, Lukasz Cwik <[email protected]> wrote:
> Combining PubSub + Bigtable is common.
>
> You should try to use the BigtableSession approach because the hbase
> approach adds a lot of dependencies (leading to dependency conflicts).
> You should use the same version of Bigtable libraries that Apache Beam is
> using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2).
>
> Can you provide the full stack trace for the exception your seeing?
>
> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans <
> [email protected]> wrote:
>
>> Hi list,
>>
>> I'm trying to figure out if Beam is intended to do the following and, if
>> so, what's the best approach?
>>
>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively new to
>> Java, so if there's any known solution for this a code example would be
>> greatly appreciated.
>>
>> I have an unbound stream of short messages (coming from PubSub).
>>
>> For each message, I want to get a number of rows from an external
>> database (rows within Bigtable, always the same table) based on the
>> contents of the message, and use the rows when producing output for the
>> final write apply.
>>
>> I've tried various means of connecting out to Bigtable from within
>> the DoFn which is handling the PubSub inputs, but so far everything I've
>> tried has resulted in Beam refusing to run the job due to:
>>
>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr
>> oxyInvocationHandler
>>
>> (methods I've tried: manually using a BigtableSession, manually using the
>> Bigtable HBase libs)
>>
>> So is this something that Beam was designed to do? If so, what's the
>> recommended approach?
>>
>> I considered dynamically constructing a PCollection, but I wasn't sure if
>> that would make use of connection pooling to Bigtable.
>>
>> Thanks,
>> Gwilym
>>
>>
>