Re: Third Apache Beam meet up in London
Great and thanks for organizing guys! On 27 February 2018 at 03:22, Matthias Baetens wrote: > Hi all, > > We are very excited to announce the third Apache Beam meetup in London *the > 5th of March*. > > Tyler Akidau (Google) will be talking about Streaming SQL, while Victor > Kotai (Qubit) will talk about putting a Beam pipeline in production and how > to monitor it, while I (Datatonic) will do a talk on how to leverage Beam > for a Machine Learning use case. > > More info and RSVP: http://bit.ly/3rdApacheBeamMeetupLondon > > We'll do our best to record and share the session again as well. > > Best regards, > Matthias >
Re: Running code before pipeline starts
Thanks Lukasz, went with the side input approach and it worked perfectly! On Wed, 28 Feb 2018, at 18:28, Lukasz Cwik wrote: > You should use a side input and not an empty PCollection that you > flatten.> > Since > ReadA --> Flatten --> ParDo > ReadB -/ > can be equivalently executed as: > ReadA --> ParDo > ReadB --> ParDo > > Make sure you access the side input in case a runner evaluates the > side input lazily.> > So your pipeline would look like: > Create --> ParDo(DoAction) --> View.asSingleton() named X > ... --> ParDo(ProcessElements).withSideInput(X) --> ... > > An alternative would be to use CoGroupByKey to join the two streams > since it is not possible to split the execution like I showed with > Flatten. It is wasteful to add the CoGroupByKey but it is a lot less > wasteful if you convert a preceding GroupByKey in your pipeline into a > CoGroupByKey joining the two streams.> > On Wed, Feb 28, 2018 at 8:58 AM, Andrew Jones jones.com> wrote:>> Hi, >> >> What is the best way to run code before the pipeline starts? >> Anything in the `main` function doesn't get called when the pipeline >> is ran on Dataflow via a template - only the pipeline. If you're >> familiar with Spark, then I'm thinking of code that might be ran in >> the driver.>> >> Alternatively, is there a way I can run part of a pipeline first, >> then run another part once it's completed? Not sure that makes >> sense, so to illustrate with a poor attempt at an ascii diagram, if >> I have something like this:>> >> events >> /\ >> /\ >> |group by key >> | | >> |do some action >> |/ >> |/ >> once action is complete, >> process all original elements >> >> I can presumably achieve this by having `do some action` either >> generating an empty side input or an empty PCollection which I can >> then use to create a PCollectionList along with the original and >> pass to Flatten.pCollections() before continuing. Not sure if that's >> the best way to do it though.>> >> Thanks, >> Andrew
Error running 2.3.0 on Dataflow
Hi, I've tried to upgrade a Beam job to 2.3.0 and deploy on Dataflow and getting the following error: 2018-03-01 10:52:35 INFO PackageUtil:316 - Uploading 169 files from PipelineOptions.filesToStage to staging location to prepare for execution. Exception in thread "main" java.lang.RuntimeException: Error while staging packages at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:396) at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:272) at org.apache.beam.runners.dataflow.util.GcsStager.stageFiles(GcsStager.java:76) at org.apache.beam.runners.dataflow.util.GcsStager.stageDefaultFiles(GcsStager.java:64) at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:661) at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) at com.gocardless.data.beam.GCSToBigQuery.main(GCSToBigQuery.java:47) Caused by: java.io.IOException: Error executing batch GCS request at org.apache.beam.sdk.util.GcsUtil.executeBatches(GcsUtil.java:610) at org.apache.beam.sdk.util.GcsUtil.getObjects(GcsUtil.java:341) at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchNonGlobs(GcsFileSystem.java:216) at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:85) at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:123) at org.apache.beam.sdk.io.FileSystems.matchSingleFileSpec(FileSystems.java:188) at org.apache.beam.runners.dataflow.util.PackageUtil.alreadyStaged(PackageUtil.java:159) at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackageSynchronously(PackageUtil.java:183) at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stagePackage$1(PackageUtil.java:173) at org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111) at org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58) at org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: com.google.api.client.http.HttpResponseException: 404 Not Found Not Found at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:500) at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:479) at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76) at org.apache.beam.sdk.util.GcsUtil.executeBatches(GcsUtil.java:602) ... 14 more Looks like it's when staging files, but I haven't changed the staging location (or anything else) - just the Beam version. Have tried a couple of things I can think of, like adding a slash to the end of the staging path, and deleting the directory to see if it gets recreated (it didn't), but no luck. Error occurs when running a job directly or uploading a template. Thanks, Andrew
Re: BigQueryIO streaming inserts - poor performance with multiple tables
Hi Cham, Thanks, I have emailed the dataflow-feedback email address with the details. Best regards, Josh On Thu, Mar 1, 2018 at 12:26 AM, Chamikara Jayalath wrote: > Could be a DataflowRunner specific issue. Would you mind reporting this > with corresponding Dataflow job IDs to either Dataflow stackoverflow > channel [1] or dataflow-feedb...@google.com ? > > I suspect Dataflow split writing to multiple tables into multiple workers > which may be keep all workers busy but we have to look at the job to > confirm. > > Thanks, > Cham > > [1] https://stackoverflow.com/questions/tagged/google-cloud-dataflow > > On Tue, Feb 27, 2018 at 11:56 PM Josh wrote: > >> Hi all, >> >> We are using BigQueryIO.write() to stream data into BigQuery, and are >> seeing very poor performance in terms of number of writes per second per >> worker. >> >> We are currently using *32* x *n1-standard-4* workers to stream ~15,000 >> writes/sec to BigQuery. Each worker has ~90% CPU utilisation. Strangely the >> number of workers and worker CPU utilisation remains constant at ~90% even >> when the rate of input fluctuates down to below 10,000 writes/sec. The job >> always keeps up with the stream (no backlog). >> >> I've seen BigQueryIO benchmarks which show ~20k writes/sec being achieved >> with a single node, when streaming data into a *single* BQ table... So >> my theory is that writing to multiple tables is somehow causing the >> performance issue. Our writes are spread (unevenly) across 200+ tables. The >> job itself does very little processing, and looking at the Dataflow metrics >> pretty much all of the wall time is spent in the *StreamingWrite* step >> of BigQueryIO. The Beam version is 2.2.0. >> >> Our code looks like this: >> >> stream.apply(BigQueryIO.write() >> .to(new ToDestination()) >> .withFormatFunction(new FormatForBigQuery()) >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >> >> where ToDestination is a: >> >> SerializableFunction, TableDestination> >> >> which returns a: >> >> new TableDestination(tableName, "") >> >> where tableName looks like "myproject:dataset.tablename$20180228" >> >> Has as anyone else seen this kind of poor performance when streaming writes >> to multiple BQ tables? Is there anything here that sounds wrong, or any >> optimisations we can make? >> >> Thanks for any advice! >> >> Josh >> >