Re: Kafka offset management

2017-06-30 Thread Gwilym Evans
Hi Raghu,

Thanks for the checkpointing info. I'll have to think about this more, but
I'm wondering if there's some custom work I can do to have the output of
the final stage perform some offset commits back to Kafka. There was
another thread here I commented in about giving Write transforms an output
so that further processing could be done if and only if a Write is
confirmed, and that would be very handy here. Is there perhaps some way to
wrap my PubSubIO Write stage in a custom class so that I can override
certain lifecycle stages to write up a commit to Kafka?

Regarding the truststore on dataflow: I required the Factory because of the
local file problem you mention, so I put the rest of the configs in there
too. My solution to the local file problem was to make the keystore /
truststore files a Java class resource, since those are distributed to
workers, and the resource is written to a temporary location on the worker
during consumer creation:

private static class KafkaConsumerFactoryFn
implements SerializableFunction,
Consumer> {

@Override
public Consumer apply(Map
stringObjectMap) {
// kafka's truststore is embedded as a static resource, but
kafka wants it as a file on disk, so write it
// to a temp file and use that
final File truststore;
try {
truststore = File.createTempFile("truststore", ".jks");
truststore.deleteOnExit();

InputStream truststoreStream =
JOB_CLASS_HERE.class.getResourceAsStream("RESOURCE_NAME_HERE");
Files.copy(truststoreStream, truststore.toPath(),
StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
e.printStackTrace();
return null;
}
...
config.put("ssl.truststore.location",
truststore.toPath().toString());

Cheers,
Gwilym



On 30 June 2017 at 16:36, Raghu Angadi  wrote:

> Gwilym,
>
> I think your understanding is correct, with one caveat as noted below. As
> Jingsong suggested committing offsets in 'KafkaCheckpointMark.
> finalizeCheckpoint()
> '
> is required. I had left a comment there. It is fairly straight forward to
> add this as an option.
>
> Note that finalizing would give at least once semantics only if you drain
> a pipeline before restarting it. If a pipeline is killed or crashes, you
> can still miss some records. Finalize checkpoint is called in Dataflow once
> the messages are checkpointed for the current stage. The downstream stages
> might not have processed them. Draining a pipeline ensures that all the
> input is processed through the pipeline.
>
> > The consumer factory is used because of some runtime SSL key/truststore
> setup:
>
> btw, KafkaIO includes api to set
> 
>  consumer
> configs. Wasn't it enough?
> Did you get trust store config working with Dataflow? Last I remember
> "ssl.truststore.location" had to be a local file on the worker and it was
> not easy to make that accessible.
>
> On Thu, Jun 29, 2017 at 10:47 PM, JingsongLee 
> wrote:
>
>> Sorry, forget user mail group. + @user
>>
>> Yeah, although KafkaIO (exposed offsets interface) is different from P
>> ubSubIO, committing the offsets to Kafka in finalizeCheckpoint is also
>> a way.
>> Welcome to contribute and maybe @Raghu Angadi can show more messages.
>>
>> Best, Jingsong Lee
>>
>> --
>> From:Gwilym Evans 
>> Time:2017 Jun 30 (Fri) 13:27
>> To:JingsongLee 
>> Subject:Re: Kafka offset management
>>
>> Thanks for the info. I'll have a look and see if I can do anything
>> similar.
>>
>> I am very new to Beam internals, but I've been having a look at the
>> KafkaIO code and comparing it to the PubSubIO code.
>>
>> I think that the finalizeCheckpoint implementation for KafkaIO should
>> definitely be committing the offsets to Kafka, if possible. But perhaps
>> only when a Kafka group.id is configured, as committing offsets for
>> random or blank group IDs is kind of pointless.
>>
>> I think I'll take a shot at making and contributing this, even if it's
>> optional. Unless you can think of a reason to specifically not do this?
>>
>> Though, looking a the KafkaIO source for this, there is even a comment
>> there alluding to the fact that this should maybe be done to provide better
>> restart options.
>>
>> -Gwilym
>>
>>
>> On 30 June 2017 at 05:08, JingsongLee  wrote:
>> Oh. I know what you mean.
>>
>> In our production, if we need to re-run (lose checkpoint and state when
>> a job crashes, is canceled, or is drained), we will set the KafkaIO
>> startTime to start a new job, because we generally know the last
>> consumer timestamp of previous job. (Do no

Re: Kafka offset management

2017-06-29 Thread Gwilym Evans
Hi JingsongLee,

Thanks for the reply.

What I'm trying to avoid are lost / skipped messages due to two situations:

1. Lost offsets, or
2. Premature offset commits

I've researched snapshot checkpoints, and from what I understand these are
only maintained in Dataflow when a job is updated. If a job crashes, is
cancelled, or is drained, then the checkpoints are lost. This is situation
(1) above.

>From what I understand about auto-commit offsets, it's where the Kafka
client periodically commits offsets it has polled automatically. In the
case of Beam and Dataflow, this would be even if the offsets it is
committing has not yet been fully processed by the pipeline. This is
situation (2) above.

So far I'm not seeing a way to avoid data loss besides resetting to the
earliest offset when a job starts. But, given we retain data in our Kafka
topics for up to 7 days, that is not feasible from a performance point of
view.

Can anyone confirm / deny my understanding here?

Cheers,
Gwilym

On 30 June 2017 at 02:59, JingsongLee  wrote:

> Hi Gwilym
> KafkaIO uses the save offset to the snapshot (checkpoint) instead of commit 
> offsets to
> Kafka for restarting.
> You can use a kafka client configuration to open auto commit of offsets.
>
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("enable.auto.commit") = true
>
> Hope this helps.
>
> Best, JingsongLee
>
> --
> From:Gwilym Evans 
> Time:2017 Jun 29 (Thu) 15:32
> To:user 
> Subject:Kafka offset management
>
> Hi list,
>
> I was playing around with KafkaIO today to understand how it behaves in a
> failure or restart scenario (be it crash, cancel, or drain), and I found it
> "lost" (or skipped) Kafka messages in these cases. That is, it resumed from
> the latest offsets rather than the last successfully processed offsets, and
> a gap in messages was observed as a result.
>
> My KafkaIO transform looks like:
>
> KafkaIO.readBytes()
> .withConsumerFactoryFn(new KafkaConsumerFactoryFn())
> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
> .withTopics(ImmutableList.of(KAFKA_TOPIC))
>
> The consumer factory is used because of some runtime SSL key/truststore
> setup:
>
> final Map config = Maps.newHashMap();
> config.put("auto.offset.reset", "latest");
> config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
> config.put("group.id", KAFKA_GROUP_ID);
> config.put("key.deserializer", ByteArrayDeserializer.class.
> getName());
> config.put("security.protocol", "SSL");
> config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");
> config.put("ssl.truststore.location",
> truststore.toPath().toString());
> config.put("ssl.truststore.password",
> kafkaTruststorePassword);
> config.put("value.deserializer", ByteArrayDeserializer.class.
> getName());
>
> return new KafkaConsumer<>(config);
>
> So, I am setting a group.id, and I know KafkaIO is using the new consumer
> because our Zookeeper is not accessible from Dataflow.
>
> When I look on the Kafka cluster, there is no record of the consumer
> group's offsets. So I take it KafkaIO is not committing offsets to Kafka.
>
> Can this be changed to commit offsets when a "batch" of streaming messages
> are seen as processed OK? I am fine with at-least-once.
>
> I am using Dataflow, Beam 2.0, Kafka 0.10.2.1
>
> Cheers,
> Gwilym
>
>
>


Kafka offset management

2017-06-29 Thread Gwilym Evans
Hi list,

I was playing around with KafkaIO today to understand how it behaves in a
failure or restart scenario (be it crash, cancel, or drain), and I found it
"lost" (or skipped) Kafka messages in these cases. That is, it resumed from
the latest offsets rather than the last successfully processed offsets, and
a gap in messages was observed as a result.

My KafkaIO transform looks like:

KafkaIO.readBytes()
.withConsumerFactoryFn(new KafkaConsumerFactoryFn())
.withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
.withTopics(ImmutableList.of(KAFKA_TOPIC))

The consumer factory is used because of some runtime SSL key/truststore
setup:

final Map config = Maps.newHashMap();
config.put("auto.offset.reset", "latest");
config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
config.put("group.id", KAFKA_GROUP_ID);
config.put("key.deserializer",
ByteArrayDeserializer.class.getName());
config.put("security.protocol", "SSL");
config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");
config.put("ssl.truststore.location",
truststore.toPath().toString());
config.put("ssl.truststore.password", kafkaTruststorePassword);
config.put("value.deserializer",
ByteArrayDeserializer.class.getName());

return new KafkaConsumer<>(config);

So, I am setting a group.id, and I know KafkaIO is using the new consumer
because our Zookeeper is not accessible from Dataflow.

When I look on the Kafka cluster, there is no record of the consumer
group's offsets. So I take it KafkaIO is not committing offsets to Kafka.

Can this be changed to commit offsets when a "batch" of streaming messages
are seen as processed OK? I am fine with at-least-once.

I am using Dataflow, Beam 2.0, Kafka 0.10.2.1

Cheers,
Gwilym


PubsubIO and message retries

2017-06-11 Thread Gwilym Evans
Hi list,

When processing a PubsubMessage, is it possible to trigger something to
cause the message to return to the queue?

That is, to either NACK the message, or just not ACK it?

I'm using the Java Beam SDK v2.0.0

Thanks,
Gwilym


Re: Action in the pipeline after Write

2017-06-11 Thread Gwilym Evans
Ah, so in my approach you would potentially end up with a "report" (final
step) which happens before the writes complete and is based on the input
contents rather than what was actually written.

Thanks for that.

Getting some sort of contextual output from Write results sounds like a
good idea, so that we can make dependent chains. Even if it's for something
as simple as pinging a monitoring tool like deadmanssnitch to ensure a
regular job is completing.


On 12 June 2017 at 04:42, Eugene Kirpichov  wrote:

> This would write the data, and in parallel, also apply your combiner to
> the same data and apply the other thing. Combine transform does not have
> any "sequencing" effects - it is a basic aggregation transform; it's under
> the hood of Count, Sum, Mean, and other aggregation transforms; it combines
> a collection of values into a single value.
>
> The only sequencing mechanism in pipelines is data dependency (i.e. when
> an output of one transform is an input of another). Since Write has no
> outputs, it is currently impossible to sequence it against anything.
>
> It'd probably make sense to modify the Write transform to return some
> PValue, rather than PDone.
>
> On Sun, Jun 11, 2017 at 8:36 PM Gwilym Evans 
> wrote:
>
>> Would the following work though? I could be misunderstanding the
>> situation:
>>
>> transform = p.apply(some transform)
>> transform.apply(write)
>> transform.apply(combine).apply(something on combined result)
>> p.run()
>>
>> Cheers,
>> Gwilym
>>
>>
>> On 12 June 2017 at 02:36, Lukasz Cwik  wrote:
>>
>>> Unfortunately you can't Combine Writes since they return PDone (a
>>> terminal node) during pipeline construction.
>>>
>>> On Sun, Jun 11, 2017 at 3:23 PM, Gwilym Evans <
>>> gwilym.ev...@bigcommerce.com> wrote:
>>>
>>>> I'm not 100% sure as I haven't tried it, but, Combining comes to mind
>>>> as a possible way of doing this, assuming your data is finite
>>>>
>>>> https://beam.apache.org/documentation/programming-
>>>> guide/#transforms-combine
>>>>
>>>> You could take the PCollection result of 2 and simultaneously apply the
>>>> Write and the Combine, using the singular result of the Combine to trigger
>>>> the remaining steps
>>>>
>>>> Hope that helps, I'm still learning
>>>>
>>>> -Gwilym
>>>>
>>>>
>>>> On 11 June 2017 at 16:50, Morand, Sebastien <
>>>> sebastien.mor...@veolia.com> wrote:
>>>>
>>>>> No, I can't, the pipeline is created within a cron, which is limited
>>>>> to 10 minutes.
>>>>>
>>>>> *Sébastien MORAND*
>>>>> Team Lead Solution Architect
>>>>> Technology & Operations / Digital Factory
>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>> <+33%201%2085%2057%2071%2008>
>>>>> Bureau 0144C (Ouest)
>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>> <http://www.veolia.com>
>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>> <https://twitter.com/veolia>
>>>>>
>>>>> On 11 June 2017 at 18:21, Eugene Kirpichov 
>>>>> wrote:
>>>>>
>>>>>> Hmm can you simply do this in your main program after the pipeline
>>>>>> finishes?
>>>>>>
>>>>>> p.run().waitUntilFinish();
>>>>>> ... Send report ...
>>>>>>
>>>>>> On Sun, Jun 11, 2017, 1:50 AM Morand, Sebastien <
>>>>>> sebastien.mor...@veolia.com> wrote:
>>>>>>
>>>>>>> Yes this use case can be treated by using parallel operation.
>>>>>>>
>>>>>>> I have a 2nd one, I would like to send a report at the end of the
>>>>>>> pipeline when the last line has been written in bigquery: number of 
>>>>>>> lines
>>>>>>> treated, number of lines ignored (from another part of the pipeline 
>>>>>>> using
>>>>>>> graph as you des

Re: Action in the pipeline after Write

2017-06-11 Thread Gwilym Evans
Would the following work though? I could be misunderstanding the situation:

transform = p.apply(some transform)
transform.apply(write)
transform.apply(combine).apply(something on combined result)
p.run()

Cheers,
Gwilym


On 12 June 2017 at 02:36, Lukasz Cwik  wrote:

> Unfortunately you can't Combine Writes since they return PDone (a terminal
> node) during pipeline construction.
>
> On Sun, Jun 11, 2017 at 3:23 PM, Gwilym Evans <
> gwilym.ev...@bigcommerce.com> wrote:
>
>> I'm not 100% sure as I haven't tried it, but, Combining comes to mind as
>> a possible way of doing this, assuming your data is finite
>>
>> https://beam.apache.org/documentation/programming-guide/#
>> transforms-combine
>>
>> You could take the PCollection result of 2 and simultaneously apply the
>> Write and the Combine, using the singular result of the Combine to trigger
>> the remaining steps
>>
>> Hope that helps, I'm still learning
>>
>> -Gwilym
>>
>>
>> On 11 June 2017 at 16:50, Morand, Sebastien 
>> wrote:
>>
>>> No, I can't, the pipeline is created within a cron, which is limited to
>>> 10 minutes.
>>>
>>> *Sébastien MORAND*
>>> Team Lead Solution Architect
>>> Technology & Operations / Digital Factory
>>> Veolia - Group Information Systems & Technology (IS&T)
>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>> <+33%201%2085%2057%2071%2008>
>>> Bureau 0144C (Ouest)
>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>> *www.veolia.com <http://www.veolia.com>*
>>> <http://www.veolia.com>
>>> <https://www.facebook.com/veoliaenvironment/>
>>> <https://www.youtube.com/user/veoliaenvironnement>
>>> <https://www.linkedin.com/company/veolia-environnement>
>>> <https://twitter.com/veolia>
>>>
>>> On 11 June 2017 at 18:21, Eugene Kirpichov  wrote:
>>>
>>>> Hmm can you simply do this in your main program after the pipeline
>>>> finishes?
>>>>
>>>> p.run().waitUntilFinish();
>>>> ... Send report ...
>>>>
>>>> On Sun, Jun 11, 2017, 1:50 AM Morand, Sebastien <
>>>> sebastien.mor...@veolia.com> wrote:
>>>>
>>>>> Yes this use case can be treated by using parallel operation.
>>>>>
>>>>> I have a 2nd one, I would like to send a report at the end of the
>>>>> pipeline when the last line has been written in bigquery: number of lines
>>>>> treated, number of lines ignored (from another part of the pipeline using
>>>>> graph as you described), number of files at the begining, and so on.
>>>>>
>>>>> This report could be:
>>>>>
>>>>>1. Write a pub/sub
>>>>>2. Send an email
>>>>>3. Call an url with parameters
>>>>>
>>>>> Is this possible?
>>>>>
>>>>> Regards,
>>>>>
>>>>>
>>>>>
>>>>> *Sébastien MORAND*
>>>>> Team Lead Solution Architect
>>>>> Technology & Operations / Digital Factory
>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>> <+33%201%2085%2057%2071%2008>
>>>>> Bureau 0144C (Ouest)
>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>> <http://www.veolia.com>
>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>> <https://twitter.com/veolia>
>>>>>
>>>>> On 11 June 2017 at 04:14, Eugene Kirpichov 
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>> It sounds like you want to write data to BigQuery and then load the
>>>>>> same data back from BigQuery? Why? I'm particularly confused by your
>>>>>> comment "nothing left in the PCollection" - writing a collection to
>>>>>> BigQuery doesn't remove data from the collection, a PCollection is just a
>>>>>> logical description of a dataset, not a mutable container. Transforms are
>>>>>> like math

Re: Action in the pipeline after Write

2017-06-11 Thread Gwilym Evans
I'm not 100% sure as I haven't tried it, but, Combining comes to mind as a
possible way of doing this, assuming your data is finite

https://beam.apache.org/documentation/programming-guide/#transforms-combine

You could take the PCollection result of 2 and simultaneously apply the
Write and the Combine, using the singular result of the Combine to trigger
the remaining steps

Hope that helps, I'm still learning

-Gwilym


On 11 June 2017 at 16:50, Morand, Sebastien 
wrote:

> No, I can't, the pipeline is created within a cron, which is limited to 10
> minutes.
>
> *Sébastien MORAND*
> Team Lead Solution Architect
> Technology & Operations / Digital Factory
> Veolia - Group Information Systems & Technology (IS&T)
> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
> <+33%201%2085%2057%2071%2008>
> Bureau 0144C (Ouest)
> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
> *www.veolia.com *
> 
> 
> 
> 
> 
>
> On 11 June 2017 at 18:21, Eugene Kirpichov  wrote:
>
>> Hmm can you simply do this in your main program after the pipeline
>> finishes?
>>
>> p.run().waitUntilFinish();
>> ... Send report ...
>>
>> On Sun, Jun 11, 2017, 1:50 AM Morand, Sebastien <
>> sebastien.mor...@veolia.com> wrote:
>>
>>> Yes this use case can be treated by using parallel operation.
>>>
>>> I have a 2nd one, I would like to send a report at the end of the
>>> pipeline when the last line has been written in bigquery: number of lines
>>> treated, number of lines ignored (from another part of the pipeline using
>>> graph as you described), number of files at the begining, and so on.
>>>
>>> This report could be:
>>>
>>>1. Write a pub/sub
>>>2. Send an email
>>>3. Call an url with parameters
>>>
>>> Is this possible?
>>>
>>> Regards,
>>>
>>>
>>>
>>> *Sébastien MORAND*
>>> Team Lead Solution Architect
>>> Technology & Operations / Digital Factory
>>> Veolia - Group Information Systems & Technology (IS&T)
>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>> <+33%201%2085%2057%2071%2008>
>>> Bureau 0144C (Ouest)
>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>> *www.veolia.com *
>>> 
>>> 
>>> 
>>> 
>>> 
>>>
>>> On 11 June 2017 at 04:14, Eugene Kirpichov  wrote:
>>>
 Hi!
 It sounds like you want to write data to BigQuery and then load the
 same data back from BigQuery? Why? I'm particularly confused by your
 comment "nothing left in the PCollection" - writing a collection to
 BigQuery doesn't remove data from the collection, a PCollection is just a
 logical description of a dataset, not a mutable container. Transforms are
 like mathematical functions - they don't change their inputs, they only
 compute their outputs.

 Perhaps that you're assuming that Beam pipelines can only be a strict
 linear sequence of transforms? That is not the case - pipelines are an
 arbitrary graph, you can use a collection multiple times, i.e. apply
 multiple transforms to it. E.g. you can both write the collection to
 bigquery (step 3) and apply some other transform to the same collection
 (step 5).

 Assuming you use Java:
 PCollection foos = p.apply(TextIO.read().from(...)).apply(...some
 transform...);
 foos.apply(BigQueryIO.write().to(...));
 PCollection bars = foos.apply(...some other transform...);
 bars.apply(BigQueryIO.write().to(...));

 Let me know if this helps.

 On Sat, Jun 10, 2017 at 3:42 PM Morand, Sebastien <
 sebastien.mor...@veolia.com> wrote:

> Hi,
>
> Is there any way to add some step after a Write, because Write return
> un PDone, so I can't do anything, but I would like actually do something.
>
> Example :
>
>1. Load data from gcs
>2. Some transform
>3. Write data into bigquery
>=> Nothing left in the pcollection, but when 3 is over =>
>4. Load data from bigquery
>5. Some other transform
>6. Write data into bigquery
>
> Any way to do that?
>
> Thanks,
>
> *Sébastien MORAND*
> Team Lead Solution Architect
> Technology & Operations / Digital Factory
> Veolia - Group Information Systems & Technology (IS&T)
> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
> <+33%201%2085%2057%2071%2008>
> Bureau 0144C (Ouest)
> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
> *www.veolia.com *
> 
> 
> 

Re: Enriching stream messages based on external data

2017-06-02 Thread Gwilym Evans
Very nice to know, thank you

For what it's worth, my job is now up and running perfectly

Thank you all again

On 2 June 2017 at 16:19, Lukasz Cwik  wrote:

> If a DoFn needs information from PipelineOptions, it should really get
> them from the runtime context (StartBundleContext, FinishBundleContext,
> ProcessContext) with getPipelineOptions.
> PipelineOptions is specifically designed in this way to prevent users from
> relying on their own serialized version and missing out on:
> * Value provider / template integration
> * Runner provided information (credentials/...)
> * Execution environment specific information (logging/host information)
>
>
>
>
> On Thu, Jun 1, 2017 at 8:37 PM, Gwilym Evans  > wrote:
>
>> Thanks for both of your assistance
>>
>> It turned out that a closer examination of the stack trace above revealed
>> the true source:
>>
>> Caused by: java.io.NotSerializableException: org.apache.beam.sdk.options.
>> ProxyInvocationHandler
>>
>> It turns out the "options" class for beam cannot be serialized.
>>
>> Moving this particular DoFn out to its own class let me inject the needed
>> serializable configs rather than passing out the options, and now I'm back
>> on track.
>>
>>
>> On 1 June 2017 at 22:48, Gwilym Evans 
>> wrote:
>>
>>> Csabi, I will try that, thank you.
>>>
>>> Eugene, sorry I should have mentioned that I've already tried that and
>>> it still fails. I've also tried annotating it with @JsonIgnore. Thanks,
>>> though.
>>>
>>> On 1 June 2017 at 22:46, Eugene Kirpichov  wrote:
>>>
>>>> It's probably because of the BigtableSession variable - mark it
>>>> transient.
>>>>
>>>> On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai 
>>>> wrote:
>>>>
>>>>> Hi Gwilym,
>>>>>
>>>>> try to extract the DoFn into a separate static inner class or into a
>>>>> separate file as a top level class, instead of declaring as an
>>>>> anonymous inner class. In java the anonymous inner class has an
>>>>> implicit reference to the outer enclosing class, and I suspect that the
>>>>> serialiser is not able the serialise the fields of this enclosing 
>>>>> instance.
>>>>>
>>>>> Regards,
>>>>> Csabi
>>>>>
>>>>> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans 
>>>>> wrote:
>>>>>
>>>>>> Here's what I have in my history, if you need the "... X more"
>>>>>> expanded I can look into that:
>>>>>>
>>>>>> 2017-06-01 05:23:05 INFO  
>>>>>> DataflowPipelineOptions$StagingLocationFactory:127
>>>>>> - No stagingLocation provided, falling back to gcpTempLocation
>>>>>> 2017-06-01 05:23:06 INFO  DataflowRunner:229 -
>>>>>> PipelineOptions.filesToStage was not specified. Defaulting to files from
>>>>>> the classpath: will stage 111 files. Enable logging at DEBUG level to see
>>>>>> which files will be staged.
>>>>>> [WARNING]
>>>>>> java.lang.reflect.InvocationTargetException
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>> ssorImpl.java:62)
>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>> thodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.lang.IllegalArgumentException: unable to serialize
>>>>>> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl
>>>>>> er$2@604b2279
>>>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr
>>>>>> ray(SerializableUtils.java:53)
>>>>>> at org.apache.beam.sdk.util.SerializableUtils.clone(Serializabl
>>>>>> eUtils.java:90)
>>>>>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.(Par
>>>>>> Do.java:569)
>>>>>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
>>>>>> at org.apache.beam.examples.AthenaPubsubOrde

Re: Enriching stream messages based on external data

2017-06-01 Thread Gwilym Evans
Thanks for both of your assistance

It turned out that a closer examination of the stack trace above revealed
the true source:

Caused by: java.io.NotSerializableException: org.apache.beam.sdk.options.
ProxyInvocationHandler

It turns out the "options" class for beam cannot be serialized.

Moving this particular DoFn out to its own class let me inject the needed
serializable configs rather than passing out the options, and now I'm back
on track.


On 1 June 2017 at 22:48, Gwilym Evans  wrote:

> Csabi, I will try that, thank you.
>
> Eugene, sorry I should have mentioned that I've already tried that and it
> still fails. I've also tried annotating it with @JsonIgnore. Thanks, though.
>
> On 1 June 2017 at 22:46, Eugene Kirpichov  wrote:
>
>> It's probably because of the BigtableSession variable - mark it transient.
>>
>> On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai 
>> wrote:
>>
>>> Hi Gwilym,
>>>
>>> try to extract the DoFn into a separate static inner class or into a
>>> separate file as a top level class, instead of declaring as an
>>> anonymous inner class. In java the anonymous inner class has an
>>> implicit reference to the outer enclosing class, and I suspect that the
>>> serialiser is not able the serialise the fields of this enclosing instance.
>>>
>>> Regards,
>>> Csabi
>>>
>>> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans 
>>> wrote:
>>>
>>>> Here's what I have in my history, if you need the "... X more" expanded
>>>> I can look into that:
>>>>
>>>> 2017-06-01 05:23:05 INFO  
>>>> DataflowPipelineOptions$StagingLocationFactory:127
>>>> - No stagingLocation provided, falling back to gcpTempLocation
>>>> 2017-06-01 05:23:06 INFO  DataflowRunner:229 -
>>>> PipelineOptions.filesToStage was not specified. Defaulting to files from
>>>> the classpath: will stage 111 files. Enable logging at DEBUG level to see
>>>> which files will be staged.
>>>> [WARNING]
>>>> java.lang.reflect.InvocationTargetException
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.IllegalArgumentException: unable to serialize
>>>> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl
>>>> er$2@604b2279
>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr
>>>> ray(SerializableUtils.java:53)
>>>> at org.apache.beam.sdk.util.SerializableUtils.clone(Serializabl
>>>> eUtils.java:90)
>>>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.(
>>>> ParDo.java:569)
>>>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
>>>> at org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl
>>>> er.main(AthenaPubsubOrderNotificationsHandler.java:138)
>>>> ... 6 more
>>>> Caused by: java.io.NotSerializableException:
>>>> org.apache.beam.sdk.options.ProxyInvocationHandler
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>>>> java:1184)
>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>> ream.java:1548)
>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>>>> m.java:1509)
>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>>>> tream.java:1432)
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>>>> java:1178)
>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>> ream.java:1548)
>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>>>> m.java:1509)
>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>>>> tream.java:1432)
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>>>> java:1178)
>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr
>>>> ray(Serializable

Re: Enriching stream messages based on external data

2017-06-01 Thread Gwilym Evans
Csabi, I will try that, thank you.

Eugene, sorry I should have mentioned that I've already tried that and it
still fails. I've also tried annotating it with @JsonIgnore. Thanks, though.

On 1 June 2017 at 22:46, Eugene Kirpichov  wrote:

> It's probably because of the BigtableSession variable - mark it transient.
>
> On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai 
> wrote:
>
>> Hi Gwilym,
>>
>> try to extract the DoFn into a separate static inner class or into a
>> separate file as a top level class, instead of declaring as an
>> anonymous inner class. In java the anonymous inner class has an implicit
>> reference to the outer enclosing class, and I suspect that the serialiser
>> is not able the serialise the fields of this enclosing instance.
>>
>> Regards,
>> Csabi
>>
>> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans 
>> wrote:
>>
>>> Here's what I have in my history, if you need the "... X more" expanded
>>> I can look into that:
>>>
>>> 2017-06-01 05:23:05 INFO  DataflowPipelineOptions$StagingLocationFactory:127
>>> - No stagingLocation provided, falling back to gcpTempLocation
>>> 2017-06-01 05:23:06 INFO  DataflowRunner:229 -
>>> PipelineOptions.filesToStage was not specified. Defaulting to files from
>>> the classpath: will stage 111 files. Enable logging at DEBUG level to see
>>> which files will be staged.
>>> [WARNING]
>>> java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>> NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.IllegalArgumentException: unable to serialize
>>> org.apache.beam.examples.AthenaPubsubOrderNotifications
>>> Handler$2@604b2279
>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(
>>> SerializableUtils.java:53)
>>> at org.apache.beam.sdk.util.SerializableUtils.clone(
>>> SerializableUtils.java:90)
>>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.
>>> (ParDo.java:569)
>>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
>>> at org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler.main(
>>> AthenaPubsubOrderNotificationsHandler.java:138)
>>> ... 6 more
>>> Caused by: java.io.NotSerializableException:
>>> org.apache.beam.sdk.options.ProxyInvocationHandler
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>> at java.io.ObjectOutputStream.defaultWriteFields(
>>> ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(
>>> ObjectOutputStream.java:1509)
>>> at java.io.ObjectOutputStream.writeOrdinaryObject(
>>> ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at java.io.ObjectOutputStream.defaultWriteFields(
>>> ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(
>>> ObjectOutputStream.java:1509)
>>> at java.io.ObjectOutputStream.writeOrdinaryObject(
>>> ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(
>>> SerializableUtils.java:49)
>>> ... 10 more
>>>
>>> The way I'm trying to use this in the ParDo/DoFn is:
>>>
>>> (line 138 starts here)
>>> .apply(ParDo.of(new DoFn>> ByteString>>() {
>>> private BigtableSession session;
>>>
>>> @Setup
>>> public void setUp() throws IOException {
>>> BigtableOptions opts = new
>>> BigtableOptions.Builder()
>>> .setProjectId(options.
>>> getSourceProjectId().get())
>>> .setInstanceId(options.
>>> getSourceInstanceId().get())
>>> .build();
>>>
>>> session = new BigtableSession(opts);
>>&

Re: Enriching stream messages based on external data

2017-06-01 Thread Gwilym Evans
Here's what I have in my history, if you need the "... X more" expanded I
can look into that:

2017-06-01 05:23:05 INFO
 DataflowPipelineOptions$StagingLocationFactory:127 - No stagingLocation
provided, falling back to gcpTempLocation
2017-06-01 05:23:06 INFO  DataflowRunner:229 - PipelineOptions.filesToStage
was not specified. Defaulting to files from the classpath: will stage 111
files. Enable logging at DEBUG level to see which files will be staged.
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: unable to serialize
org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler$2@604b2279
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at
org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.(ParDo.java:569)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
at
org.apache.beam.examples.AthenaPubsubOrderNotificationsHandler.main(AthenaPubsubOrderNotificationsHandler.java:138)
... 6 more
Caused by: java.io.NotSerializableException:
org.apache.beam.sdk.options.ProxyInvocationHandler
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 10 more

The way I'm trying to use this in the ParDo/DoFn is:

(line 138 starts here)
.apply(ParDo.of(new DoFn>() {
private BigtableSession session;

@Setup
public void setUp() throws IOException {
BigtableOptions opts = new BigtableOptions.Builder()

.setProjectId(options.getSourceProjectId().get())

.setInstanceId(options.getSourceInstanceId().get())
.build();

session = new BigtableSession(opts);
}
...

With a ReadRowsRequest built up during the processElement and executed like:

ResultScanner results =
session.getDataClient().readFlatRows(request.build());

Thanks,
Gwilym


On 1 June 2017 at 15:10, Lukasz Cwik  wrote:

> Combining PubSub + Bigtable is common.
>
> You should try to use the BigtableSession approach because the hbase
> approach adds a lot of dependencies (leading to dependency conflicts).
> You should use the same version of Bigtable libraries that Apache Beam is
> using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2).
>
> Can you provide the full stack trace for the exception your seeing?
>
> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans <
> gwilym.ev...@bigcommerce.com> wrote:
>
>> Hi list,
>>
>> I'm trying to figure out if Beam is intended to do the following and, if
>> so, what's the best approach?
>>
>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively new to
>> Java, so if there's any known solution for this a code example would be
>> greatly appreciated.
>>
>> I have an unbound stream of short messages (coming from PubSub).
>>
>> For each message, I want to get a number of rows from an external
>> database (rows within Bigtable, always the same table) based on the
>> contents of the message, and use the rows when producing output for the
>> final write apply.
>>
>> I've tried various means of connecting out to Bigtable from within
>> the DoFn which is handling the PubSub inputs, but so far everything I've
>> tried has resulted in Beam refusing to run the job due to:
>>
>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr
>> oxyInvocationHandler
>>
>> (methods I've tried: manually using a BigtableSession, manually u

Enriching stream messages based on external data

2017-05-31 Thread Gwilym Evans
Hi list,

I'm trying to figure out if Beam is intended to do the following and, if
so, what's the best approach?

I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively new to
Java, so if there's any known solution for this a code example would be
greatly appreciated.

I have an unbound stream of short messages (coming from PubSub).

For each message, I want to get a number of rows from an external database
(rows within Bigtable, always the same table) based on the contents of the
message, and use the rows when producing output for the final write apply.

I've tried various means of connecting out to Bigtable from within the DoFn
which is handling the PubSub inputs, but so far everything I've tried has
resulted in Beam refusing to run the job due to:

java.io.NotSerializableException:
org.apache.beam.sdk.options.ProxyInvocationHandler

(methods I've tried: manually using a BigtableSession, manually using the
Bigtable HBase libs)

So is this something that Beam was designed to do? If so, what's the
recommended approach?

I considered dynamically constructing a PCollection, but I wasn't sure if
that would make use of connection pooling to Bigtable.

Thanks,
Gwilym