Re: WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-07 Thread Andrew Jones
It went from about 13 hours 45 mins (if it succeeded - 2018-03-03_16_00_20-
15816431303287121211) to 1 hour 15 mins (2018-03-06_03_10_41-
4393426098762549646)!
Interesting to me was the change in inputs to
WriteTOBigQuery/BatchLoads/FlattenFiles, immediately before
ReifyResults. Previously
WriteTOBigQuery/BatchLoads/WriteBundlesToFiles.out0 was 180,442, and
WriteTOBigQuery/BatchLoads/WriteGroupedRecords.out0 was 0. Now it's
7,832 and 271.

On Tue, 6 Mar 2018, at 19:26, Eugene Kirpichov wrote:
> Thanks, I'm glad it worked so well! I'm curious, just how much
> faster did it get? Do you have a job ID with the new code I can take
> a peek at?> 
> On Tue, Mar 6, 2018 at 4:45 AM Andrew Jones <andrew+beam@andrew-
> jones.com[1]> wrote:>> __
>> Thanks Eugene.
>> 
>> As you suggested, using withHintMatchesManyFiles() did result in a
>> very significant performance increase! Enough that it's fast enough
>> for our current use case.>> 
>> Will track the JIRA for any further fixes.
>> 
>> Thanks,
>> Andrew
>> 
>> On Mon, 5 Mar 2018, at 22:34, Eugene Kirpichov wrote:
>>> Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778 
>>> 
>>> On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov
>>> <kirpic...@google.com> wrote:>>>> For now I suggest that you augment your 
>>> AvroIO.parse() with
>>>> .withHintMatchesManyFiles() because it appears to match a very
>>>> large number of tiny files, and I think that's what's causing the
>>>> issue.>>>> 
>>>> By default Dataflow uses 1 shard per file, and that causes 2
>>>> problems here:>>>> - Each of these shards writes a separate file to be 
>>>> loaded into
>>>>   BigQuery, so BigQuery has to load this many (tiny) files, which
>>>>   is not great.>>>> - What's worse, the ReifyResults step takes this list 
>>>> of written
>>>>   temporary files as a side input, and given Dataflow's way of
>>>>   materializing side inputs, it behaves pretty bad when the data
>>>>   for the side input is written from a very large number of shards.>>>> 
>>>> I'm not sure there's an easy fix to make your original code perform
>>>> well unchanged, but .withHintMatchesManyFiles() should make it
>>>> perform orders of magnitude better.>>>> 
>>>> On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov
>>>> <kirpic...@google.com> wrote:>>>>> Thank you - I was wrong, it is indeed 
>>>> not blocked by BigQuery
>>>>> jobs, but by something it shouldn't be doing at all. This is
>>>>> definitely a bug. I'll investigate in more detail and file a JIRA
>>>>> so you can track the resolution.>>>>> 
>>>>> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones <andrew+beam@andrew-
>>>>> jones.com[2]> wrote:>>>>>> __
>>>>>> Thanks for the reply. In my case the time is spent *before* the
>>>>>> load job has started. See attached for a screenshot of a
>>>>>> currently running job (id: 2018-03-05_04_06_20-
>>>>>> 5803269526385225708).>>>>>> 
>>>>>> It looks like the time is spent in the ReifyResults step. Looking
>>>>>> at the code and at some smaller, succeeding jobs, the BigQuery
>>>>>> loads normally happen in SinglePartitionWriteTables (and
>>>>>> presumably MultiPartitionsWriteTables, if required). So I'm not
>>>>>> seeing any log lines with output from the BigQuery API, nor
>>>>>> anything on the BigQuery side.>>>>>> 
>>>>>> The input to ReifyResults is around ~200K elements, from
>>>>>> WriteBundlesToFiles. Looking at the code, I think these are all
>>>>>> the files staged and ready for loading. I'm finding it hard to
>>>>>> work out exactly what ReifyResults is supposed to be doing and
>>>>>> why it would take any time at all. I think it might be going
>>>>>> through these 200K files and doing something with them, which if
>>>>>> it's doing it one at a time and if the calls to the GCS API is
>>>>>> expensive then it could be the issue?>>>>>> 
>>>>>> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>>>>>>> BigQueryIO.write() works by: 1) having Dataflow workers write
>>>>>>> data to files (in parallel) 2) asking Big

Re: WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-06 Thread Andrew Jones
Thanks Eugene.

As you suggested, using withHintMatchesManyFiles() did result in a very
significant performance increase! Enough that it's fast enough for our
current use case.
Will track the JIRA for any further fixes.

Thanks,
Andrew

On Mon, 5 Mar 2018, at 22:34, Eugene Kirpichov wrote:
> Filed JIRA https://issues.apache.org/jira/browse/BEAM-3778 
> 
> On Mon, Mar 5, 2018 at 2:28 PM Eugene Kirpichov
> <kirpic...@google.com> wrote:>> For now I suggest that you augment your 
> AvroIO.parse() with
>> .withHintMatchesManyFiles() because it appears to match a very large
>> number of tiny files, and I think that's what's causing the issue.>> 
>> By default Dataflow uses 1 shard per file, and that causes 2
>> problems here:>> - Each of these shards writes a separate file to be loaded 
>> into
>>   BigQuery, so BigQuery has to load this many (tiny) files, which is
>>   not great.>> - What's worse, the ReifyResults step takes this list of 
>> written
>>   temporary files as a side input, and given Dataflow's way of
>>   materializing side inputs, it behaves pretty bad when the data for
>>   the side input is written from a very large number of shards.>> 
>> I'm not sure there's an easy fix to make your original code perform
>> well unchanged, but .withHintMatchesManyFiles() should make it
>> perform orders of magnitude better.>> 
>> On Mon, Mar 5, 2018 at 2:19 PM Eugene Kirpichov
>> <kirpic...@google.com> wrote:>>> Thank you - I was wrong, it is indeed not 
>> blocked by BigQuery jobs,
>>> but by something it shouldn't be doing at all. This is definitely a
>>> bug. I'll investigate in more detail and file a JIRA so you can
>>> track the resolution.>>> 
>>> On Mon, Mar 5, 2018 at 7:12 AM Andrew Jones <andrew+beam@andrew-
>>> jones.com[1]> wrote:>>>> __
>>>> Thanks for the reply. In my case the time is spent *before* the
>>>> load job has started. See attached for a screenshot of a currently
>>>> running job (id: 2018-03-05_04_06_20-5803269526385225708).>>>> 
>>>> It looks like the time is spent in the ReifyResults step. Looking
>>>> at the code and at some smaller, succeeding jobs, the BigQuery
>>>> loads normally happen in SinglePartitionWriteTables (and presumably
>>>> MultiPartitionsWriteTables, if required). So I'm not seeing any log
>>>> lines with output from the BigQuery API, nor anything on the
>>>> BigQuery side.>>>> 
>>>> The input to ReifyResults is around ~200K elements, from
>>>> WriteBundlesToFiles. Looking at the code, I think these are all the
>>>> files staged and ready for loading. I'm finding it hard to work out
>>>> exactly what ReifyResults is supposed to be doing and why it would
>>>> take any time at all. I think it might be going through these 200K
>>>> files and doing something with them, which if it's doing it one at
>>>> a time and if the calls to the GCS API is expensive then it could
>>>> be the issue?>>>> 
>>>> On Mon, 5 Mar 2018, at 00:28, Eugene Kirpichov wrote:
>>>>> BigQueryIO.write() works by: 1) having Dataflow workers write data
>>>>> to files (in parallel) 2) asking BigQuery to load those files -
>>>>> naturally, during this time Dataflow workers aren't doing
>>>>> anything, that's why the job is scaling down.>>>>> These jobs are 
>>>>> spending time waiting for BigQuery to load the
>>>>> data.>>>>> 
>>>>> As for why it's taking so long for BigQuery to load the data: You
>>>>> can try to look for BigQuery job ids in the Stackdriver logs, and
>>>>> then inspect these jobs in the BigQuery UI. If it's taking
>>>>> *really* long, it's usually a quota issue: i.e. your BigQuery jobs
>>>>> are waiting for some other BigQuery jobs to complete before even
>>>>> starting.>>>>> 
>>>>> On Sat, Mar 3, 2018 at 3:30 AM Andrew Jones <andrew+beam@andrew-
>>>>> jones.com[2]> wrote:>>>>>> Hi,
>>>>>> 
>>>>>> We have a Dataflow job that loads data from GCS, does a bit of
>>>>>> transformation, then writes to a number of BigQuery tables using
>>>>>> DynamicDestinations.>>>>>> 
>>>>>> The same job runs on smaller data sets (~70 million records), but
>>>>>> this one is struggling when processing ~500 milli

WriteTOBigQuery/BatchLoads/ReifyResults step taking hours

2018-03-03 Thread Andrew Jones
Hi,

We have a Dataflow job that loads data from GCS, does a bit of transformation, 
then writes to a number of BigQuery tables using DynamicDestinations.

The same job runs on smaller data sets (~70 million records), but this one is 
struggling when processing ~500 million records. Both jobs are writing to the 
same amount of tables - the only difference is the amount of records.

Example job IDs include 2018-03-02_04_29_44-2181786949469858712 and 
2018-03-02_08_46_28-4580218739500768796. They are using BigQuery.IO to write to 
BigQuery, using the BigQueryIO.Write.Method.FILE_LOADS method (the default for 
a bounded job). They successfully stage all their data to GCS, but then for 
some reason scale down the amount of workers to 1 when processing the step 
WriteTOBigQuery/BatchLoads/ReifyResults and stay in that step for hours.

In the logs we see many entries like this:

Proposing dynamic split of work unit 
...-7e07;2018-03-02_04_29_44-2181786949469858712;662185752552586455 at 
{"fractionConsumed":0.5}
Rejecting split request because custom reader returned null residual source.
And also occasionally this:

Processing lull for PT24900.038S in state process of 
WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) at 
java.net.SocketInputStream.socketRead0(Native Method) at 
java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at 
java.net.SocketInputStream.read(SocketInputStream.java:170) at 
java.net.SocketInputStream.read(SocketInputStream.java:141) ...

The job does seem to eventually progress, but after many hours. It then fails 
later with this error, which may or may not be related (just starting to look 
in to):

(94794e1a2c96f380): java.lang.RuntimeException: 
org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Unable to 
patch table description: {datasetId=..., projectId=..., 
tableId=9c20908cc6e549b4a1e116af54bb8128_011249028ddcc5204885bff04ce2a725_1_0},
 aborting after 9 retries.

We're not sure how to proceed, so any pointers would be appreciated.

Thanks,
Andrew


Re: Running code before pipeline starts

2018-03-01 Thread Andrew Jones
Thanks Lukasz, went with the side input approach and it worked
perfectly!

On Wed, 28 Feb 2018, at 18:28, Lukasz Cwik wrote:
> You should use a side input and not an empty PCollection that you
> flatten.> 
> Since
> ReadA --> Flatten --> ParDo
> ReadB -/
> can be equivalently executed as:
> ReadA --> ParDo
> ReadB --> ParDo
> 
> Make sure you access the side input in case a runner evaluates the
> side input lazily.> 
> So your pipeline would look like:
> Create --> ParDo(DoAction) --> View.asSingleton() named X
> ... --> ParDo(ProcessElements).withSideInput(X) --> ...
> 
> An alternative would be to use CoGroupByKey to join the two streams
> since it is not possible to split the execution like I showed with
> Flatten. It is wasteful to add the CoGroupByKey but it is a lot less
> wasteful if you convert a preceding GroupByKey in your pipeline into a
> CoGroupByKey joining the two streams.> 
> On Wed, Feb 28, 2018 at 8:58 AM, Andrew Jones <andrew+beam@andrew-
> jones.com> wrote:>> Hi,
>> 
>>  What is the best way to run code before the pipeline starts?
>>  Anything in the `main` function doesn't get called when the pipeline
>>  is ran on Dataflow via a template - only the pipeline. If you're
>>  familiar with Spark, then I'm thinking of code that might be ran in
>>  the driver.>> 
>>  Alternatively, is there a way I can run part of a pipeline first,
>>  then run another part once it's completed? Not sure that makes
>>  sense, so to illustrate with a poor attempt at an ascii diagram, if
>>  I have something like this:>> 
>> events
>>   /\
>>   /\
>>   |group by key
>>   | |
>>   |do some action
>>   |/
>>   |/
>>   once action is complete,
>>  process all original elements
>> 
>>  I can presumably achieve this by having `do some action` either
>>  generating an empty side input or an empty PCollection which I can
>>  then use to create a PCollectionList along with the original and
>>  pass to Flatten.pCollections() before continuing. Not sure if that's
>>  the best way to do it though.>> 
>>  Thanks,
>>  Andrew



Error running 2.3.0 on Dataflow

2018-03-01 Thread Andrew Jones
Hi,

I've tried to upgrade a Beam job to 2.3.0 and deploy on Dataflow and getting 
the following error:

2018-03-01 10:52:35 INFO  PackageUtil:316 - Uploading 169 files from 
PipelineOptions.filesToStage to staging location to prepare for execution.
Exception in thread "main" java.lang.RuntimeException: Error while staging 
packages
at 
org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:396)
at 
org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:272)
at 
org.apache.beam.runners.dataflow.util.GcsStager.stageFiles(GcsStager.java:76)
at 
org.apache.beam.runners.dataflow.util.GcsStager.stageDefaultFiles(GcsStager.java:64)
at 
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:661)
at 
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.gocardless.data.beam.GCSToBigQuery.main(GCSToBigQuery.java:47)
Caused by: java.io.IOException: Error executing batch GCS request
at org.apache.beam.sdk.util.GcsUtil.executeBatches(GcsUtil.java:610)
at org.apache.beam.sdk.util.GcsUtil.getObjects(GcsUtil.java:341)
at 
org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchNonGlobs(GcsFileSystem.java:216)
at 
org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:85)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:123)
at 
org.apache.beam.sdk.io.FileSystems.matchSingleFileSpec(FileSystems.java:188)
at 
org.apache.beam.runners.dataflow.util.PackageUtil.alreadyStaged(PackageUtil.java:159)
at 
org.apache.beam.runners.dataflow.util.PackageUtil.stagePackageSynchronously(PackageUtil.java:183)
at 
org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stagePackage$1(PackageUtil.java:173)
at 
org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
at 
org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
at 
org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: 
com.google.api.client.http.HttpResponseException: 404 Not Found
Not Found
at 
org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:500)
at 
org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:479)
at 
org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
at org.apache.beam.sdk.util.GcsUtil.executeBatches(GcsUtil.java:602)
... 14 more


Looks like it's when staging files, but I haven't changed the staging location 
(or anything else) - just the Beam version.

Have tried a couple of things I can think of, like adding a slash to the end of 
the staging path, and deleting the directory to see if it gets recreated (it 
didn't), but no luck.

Error occurs when running a job directly or uploading a template.

Thanks,
Andrew


Running code before pipeline starts

2018-02-28 Thread Andrew Jones
Hi,

What is the best way to run code before the pipeline starts? Anything in the 
`main` function doesn't get called when the pipeline is ran on Dataflow via a 
template - only the pipeline. If you're familiar with Spark, then I'm thinking 
of code that might be ran in the driver.

Alternatively, is there a way I can run part of a pipeline first, then run 
another part once it's completed? Not sure that makes sense, so to illustrate 
with a poor attempt at an ascii diagram, if I have something like this:

   events
 /\
 /\
 |group by key
 | |
 |do some action
 |/
 |/
 once action is complete,
process all original elements

I can presumably achieve this by having `do some action` either generating an 
empty side input or an empty PCollection which I can then use to create a 
PCollectionList along with the original and pass to Flatten.pCollections() 
before continuing. Not sure if that's the best way to do it though.

Thanks,
Andrew


Data guarantees PubSub to GCS

2018-01-03 Thread Andrew Jones
Hi,

I'd like to confirm Beams data guarantees when used with Google Cloud PubSub 
and Cloud Storage and running on Dataflow. I can't find any explicit 
documentation on it.

If the Beam job is running successfully, then I believe all data will be 
delivered to GCS at least once. If I stop the job with 'Drain', then any 
inflight data will be processed and saved.

What happens if the Beam job is not running successfully, and maybe throwing 
exceptions? Will the data still be available in PubSub when I cancel (not 
drain) the job? Does a drain work successfully if the data cannot be written to 
GCS because of the exceptions?

Thanks,
Andrew


Re: ParDo with Timer hangs when running under TestStream

2017-12-13 Thread Andrew Jones
No problem, thanks!


On Wed, 13 Dec 2017, at 14:30, Kenneth Knowles wrote:
> Hi Andrew,
> 
> As someone else pointed out to me, I didn't read your code carefully
> enough. Your timer is an event time timer so it should fire. I've
> filed https://issues.apache.org/jira/browse/BEAM-3341 for
> investigation.> 
> Kenn
> 
> On Wed, Dec 13, 2017 at 5:53 AM, Andrew Jones <andrew+beam@andrew-
> jones.com> wrote:>> __
>> Thanks Kenn. I've tried adding calls to advanceProcessingTime[1], but
>> it doesn't seem to be helping. The test still hangs after processing
>> all the data.>> 
>> Is this because I'm using the global window? So the window itself
>> doesn't ever get closed?>> 
>> The point about the expiry timer is a good one, thanks.
>> 
>> Thanks,
>> Andrew
>> 
>> [1]: 
>> https://github.com/andrewrjones/beam-test-stream-timer/compare/advanceProcessingTime>>
>>  
>> On Tue, 12 Dec 2017, at 04:58, Kenneth Knowles wrote:
>>> Hi Andrew,
>>> 
>>> This is because TestStream also controls processing time. You'll
>>> want to call #advanceProcessingTime [1] to move the clock forward.
>>> This example brings up a good best practice: When you use the
>>> stateful DoFn, you often want to set an event time timer for window
>>> expiration time (that's the end of the window + allowed lateness) to
>>> make sure to flush anything left in state.>>> 
>>> Kenn
>>> 
>>> [1] 
>>> https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/testing/TestStream.Builder.html#advanceProcessingTime-org.joda.time.Duration->>>
>>>  
>>> On Mon, Dec 11, 2017 at 12:51 PM, Andrew Jones <andrew+beam@andrew-
>>> jones.com> wrote:>>>> Hi,
>>>> 
>>>> I have a unit test using TestStream. It worked fine, until I
>>>> added a>>>> Timer to the pipeline I'm testing, and now it hangs after 
>>>> seemingly>>>> finishing correctly.
>>>> 
>>>> I've put together a minimal example at
>>>> https://github.com/andrewrjones/beam-test-stream-timer/blob/master/src/test/java/com/andrewjones/beam/TimerTest.java.>>>>
>>>>  I notice when I use the following, it hangs:
>>>> 
>>>> .addElements(KV.of("hello", 100))
>>>> .addElements(KV.of("hello", 200))
>>>> 
>>>> However, this seems to be fine:
>>>> 
>>>> .addElements(KV.of("hello", 100), KV.of("hello", 200))
>>>> 
>>>> In both cases the code seems to work as expected, judging by the
>>>> calls>>>> to println.
>>>> 
>>>> Is this a problem with TestStream? Or should I not have KVs
>>>> with the>>>> same Key when using a Timer?
>>>> 
>>>> Thanks,
>>>> Andrew
>> 



Re: Regarding Beam Slack Channel

2017-11-21 Thread Andrew Jones
Me too, please :)

On Tue, 21 Nov 2017, at 16:19, Dariusz Aniszewski wrote:
> Hello
> 
> Can someone please add me to the Beam slack channel?
> 
> Thanks.


Re: KafkaIO and Avro

2017-10-24 Thread Andrew Jones
Thanks Eugene, that worked perfectly!

Full final code at
https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java.
Thanks,
Andrew


On Fri, Oct 20, 2017, at 05:10 PM, Eugene Kirpichov wrote:
> This is due to Java doing type erasure in any expression that involves
> a raw type. This will compile if you extract the result of
> .apply(KafkaIO.read()...) into a local variable.> 
> On Fri, Oct 20, 2017, 1:51 AM Andrew Jones <andrew+beam@andrew-
> jones.com[1]> wrote:>> __
>> Thanks Eugene. That does compile, although the rest of the pipeline
>> doesn't seem happy.>> 
>> The next line is:
>> 
>> .apply(Values.create())
>> 
>> But that now doesn't compile with the following error:
>> 
>> /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExampl-
>> e.java:[54,17] cannot find symbol>>   symbol:   method 
>> apply(org.apache.beam.sdk.transforms.Values>   ver1.inventory.customers.Envelope>)>>   location: interface 
>> org.apache.beam.sdk.values.POutput
>> 
>> Don't really understand what's wrong here. It works fine when using
>> the EnvelopeKafkaAvroDeserializer as suggested by Tim.>> 
>> Thanks,
>> Andrew
>> 
>> 
>> On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote:
>>> Thanks Eugene 
>>> 
>>> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi <rang...@google.com>
>>> wrote:>>>> Ah, nice. It works. 
>>>> 
>>>> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov
>>>> <kirpic...@google.com> wrote:>>>>> The following compiles fine:
>>>>> 
>>>>> 
>>>>> p.apply(KafkaIO.<String, Envelope>read()
>>>>> 
>>>>> .withBootstrapServers("kafka:9092")
>>>>> .withTopic("dbserver1.inventory.customers")
>>>>> 
>>>>> .withKeyDeserializer(StringDeserializer.class)
>>>>> .withValueDeserializerAndCoder((Class)KafkaAvroDe-
>>>>> serializer.class, AvroCoder.of(Envelope.class))>>>>> 
>>>>> 
>>>>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com>
>>>>> wrote:>>>>>> Same for me. It does not look like there is an annotation to
>>>>>> suppress the error.>>>>>> 
>>>>>> 
>>>>>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson
>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>> Hi Eugene,
>>>>>>> 
>>>>>>> I understood that was where Andrew started and reported this.  I
>>>>>>> tried and saw the same as him.>>>>>>> 
>>>>>>> incompatible types: java.lang.Class>>>>>> ers.KafkaAvroDeserializer> cannot be converted to org.apache.ka-
>>>>>>> fka.common.serialization.Deserializer>>>>>> o.Envelope>>>>>>>> 
>>>>>>> similarly with 
>>>>>>> (Class>)
>>>>>>> KafkaAvroDeserializer.*class*>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov
>>>>>>> <kirpic...@google.com> wrote:>>>>>>>> I don't think extending the class 
>>>>>>> is necessary. Not sure I
>>>>>>>> understand why a simple type casting for
>>>>>>>> withDeserializerAndCoder doesn't work? Have you tried this?>>>>>>>> 
>>>>>>>> p.apply(KafkaIO.<String, Envelope>read()
>>>>>>>>   .withValueDeserializerAndCoder((Deserializer)Kafka-
>>>>>>>>   AvroDeserializer.class,>>>>>>>>   AvroCoder.of(Envelope.class))
>>>>>>>> 
>>>>>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson
>>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>> Hi Raghu
>>>>>>>>> 
>>>>>>>>> I tried that but with KafkaAvroDeserializer already
>>>>>>>>> implementing Deserializer I couldn't get it to work...
>>>>>>>>> I didn't spend too much time t

Re: KafkaIO and Avro

2017-10-20 Thread Andrew Jones
Thanks Eugene. That does compile, although the rest of the pipeline
doesn't seem happy.
The next line is:

.apply(Values.create())

But that now doesn't compile with the following error:

/usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExample.j-
ava:[54,17] cannot find symbol  symbol:   method 
apply(org.apache.beam.sdk.transforms.Values)  location: interface 
org.apache.beam.sdk.values.POutput

Don't really understand what's wrong here. It works fine when using the
EnvelopeKafkaAvroDeserializer as suggested by Tim.
Thanks,
Andrew


On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote:
> Thanks Eugene 
> 
> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi
> <rang...@google.com> wrote:>> Ah, nice. It works. 
>> 
>> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov
>> <kirpic...@google.com> wrote:>>> The following compiles fine:
>>> 
>>> 
>>> p.apply(KafkaIO.<String, Envelope>read()
>>> 
>>> .withBootstrapServers("kafka:9092")
>>> .withTopic("dbserver1.inventory.customers")
>>> 
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializerAndCoder((Class)KafkaAvroDese-
>>> rializer.class, AvroCoder.of(Envelope.class))>>> 
>>> 
>>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com>
>>> wrote:>>>> Same for me. It does not look like there is an annotation to
>>>> suppress the error.>>>> 
>>>> 
>>>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson
>>>> <timrobertson...@gmail.com> wrote:>>>>> Hi Eugene,
>>>>> 
>>>>> I understood that was where Andrew started and reported this.  I
>>>>> tried and saw the same as him.>>>>> 
>>>>> incompatible types: java.lang.Class>>>> s.KafkaAvroDeserializer> cannot be converted to org.apache.kafka.-
>>>>> common.serialization.Deserializer>>>> lope>>>>>> 
>>>>> similarly with 
>>>>> (Class>)
>>>>> KafkaAvroDeserializer.*class*>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov
>>>>> <kirpic...@google.com> wrote:>>>>>> I don't think extending the class is 
>>>>> necessary. Not sure I
>>>>>> understand why a simple type casting for withDeserializerAndCoder
>>>>>> doesn't work? Have you tried this?>>>>>> 
>>>>>> p.apply(KafkaIO.<String, Envelope>read()
>>>>>>   .withValueDeserializerAndCoder((Deserializer)KafkaAv-
>>>>>>   roDeserializer.class,>>>>>>   AvroCoder.of(Envelope.class))
>>>>>> 
>>>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson
>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>> Hi Raghu
>>>>>>> 
>>>>>>> I tried that but with KafkaAvroDeserializer already implementing
>>>>>>> Deserializer I couldn't get it to work... I didn't spend
>>>>>>> too much time though and agree something like that would be
>>>>>>> cleaner.>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Tim 
>>>>>>> 
>>>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi
>>>>>>> <rang...@google.com> wrote:>>>>>>>> Thanks Tim. 
>>>>>>>> 
>>>>>>>> How about extending KafkaAvroDeserializer rather than
>>>>>>>> AbstractKafkaAvroDeserializer?>>>>>>>> 
>>>>>>>> TypedKafkaAvroDeserializer class below is useful, but not
>>>>>>>> directly usable by the yet. It needs to store the actual type
>>>>>>>> in Kafka consumer config to retrieve at run time.>>>>>>>> Even without 
>>>>>>>> storing the class, it is still useful. It
>>>>>>>> simplifies user code:>>>>>>>> 
>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>>> TypedKafkaAvroDeserializer {}>>>>>>>> 
>>>>>>>> This should be part of same package as KafkaAvroDeserializer
>>&g

Re: KafkaIO and Avro

2017-10-19 Thread Andrew Jones
Thanks Tim, that works!

Full code is:

public class EnvelopeKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer {@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}

@Override
public Envelope deserialize(String s, byte[] bytes) {
return (Envelope) this.deserialize(bytes);
}

@Override
public void close() {}
}

Nicer than my solution so think that is the one I'm going to go
with for now.
Thanks,
Andrew


On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
> Hi Andrew,
> 
> I also saw the same behaviour.  
> 
> It's not pretty but perhaps try this? It was my last idea I ran out of
> time to try...> *// Basically a copy KafkaAvroDeserializer with the casts in
> deserialize
**public class *EnvelopeAvroDeserializer *extends
*AbstractKafkaAvroDeserializer *implements *Deserializer {
>> 
> 
> 
>   ...
> 
> 
> 
> 
>   *public *Envelope deserialize(String s, *byte*[] bytes) {
> 
> 
> 
> 
> *return *(Envelope) *this*.deserialize(bytes);
> 
> 
> 
> 
>   }
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema
>   readerSchema) {
>> 
> 
> 
> *return *(Envelope) *this*.deserialize(bytes, readerSchema);
> 
> 
> 
> 
>   }
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   ...
> 
> 
> 
> 
> }
> 
> 
> 
> 
> Tim
> 
> 
> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <andrew+beam@andrew-
> jones.com> wrote:>> __
>> Using Object doesn't work unfortunately. I get an 'Unable to
>> automatically infer a Coder' error at runtime.>> 
>> This is the code:
>> 
>> p.apply(KafkaIO.<String, Object>read()
>> .withValueDeserializer(KafkaAvroDeserializer.class)
>> 
>> It compiles, but at runtime:
>> 
>> Caused by: java.lang.RuntimeException: Unable to automatically infer
>> a Coder for the Kafka Deserializer class
>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder
>> registered for type class java.lang.Object>> at 
>> org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)>> 
>> So far the only thing I've got working is this, where I use the
>> ByteArrayDeserializer and then parse Avro myself:>> 
>> private static KafkaAvroDecoder avroDecoder;
>> static {
>> final Properties props = new Properties();
>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> "kafka:9092");>> 
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_C-
>> ONFIG, "http://registry:8081;);>> 
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_C-
>> ONFIG, true);>> VerifiableProperties vProps = new
>> VerifiableProperties(props);>> avroDecoder = new 
>> KafkaAvroDecoder(vProps);
>> }
>> 
>> public static void main(String[] args) {
>> 
>> PipelineOptions options = PipelineOptionsFactory.create();
>> Pipeline p = Pipeline.create(options);
>> 
>> p.apply(KafkaIO.<byte[], byte[]>read()
>> .withBootstrapServers("kafka:9092")
>> .withTopic("dbserver1.inventory.customers")
>> .withKeyDeserializer(ByteArrayDeserializer.class)
>> .withValueDeserializer(ByteArrayDeserializer.class)
>> .withoutMetadata(
>> )
>> .apply(Values.<byte[]>create())
>> .apply("ParseAvro", ParDo.of(new DoFn<byte[],
>> Envelope>() {>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> Envelope data = (Envelope)
>> avroDecoder.fromBytes(c.element());>>
>>  c.output(data);
>> }
>> }))
>> 
>> Thanks,
>> Andrew
>> 
>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov
>>> <kirpic...@google.com> wrote:>>>> It seems that KafkaAvroDeserializer 
>>> implements
>>>> Deserializer, though I suppose with proper configuration
>>>> that Object will at run-time be your desired type. Have you tried
>>>> adding som

KafkaIO and Avro

2017-10-18 Thread Andrew Jones
Hi,

I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
it should be as simple as:

p.apply(KafkaIO.*read*()
  .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
  AvroCoder.of(Envelope.class))

Where Envelope is the name of the Avro class. However, that does not
compile and I get the following error:

incompatible types:
java.lang.Class
cannot be converted to java.lang.Class>

I've tried a number of variations on this theme but haven't yet worked
it out and am starting to run out of ideas...

Has anyone successfully read Avro data from Kafka?

The code I'm using can be found at
https://github.com/andrewrjones/debezium-kafka-beam-example and a full
environment can be created with Docker.

Thanks,
Andrew