Re: Breaking change for FileIO WriteDynamic in Beam 2.34?
Without the full logs it's hard to say, but I've definitely seen that error in the past when the worker disks are full. ApplianceShuffleWriter needs to extract a native library to a temp location, and if the disk is full that'll fail, resulting in the NoClassDefFoundError. On Wed, Apr 6, 2022 at 12:46 PM Chamikara Jayalath wrote: > I'm not aware of a breaking change along these lines off the top of my > head. Sounds like the classes required for Dataflow shuffle are missing > somehow. Unless someone here can point to something, you might have to > contact Google Cloud support so they can look at your job. > > Thanks, > Cham > > On Wed, Apr 6, 2022 at 9:39 AM Ahmet Altay wrote: > >> /cc @Chamikara Jayalath >> >> On Tue, Apr 5, 2022 at 11:29 PM Siyu Lin wrote: >> >>> Hi Beam community, >>> >>> We have a batch pipeline which does not run regularly. Recently we >>> have upgraded to Beam 2.36 and this broke the FileIO WriteDynamic >>> process. >>> >>> We are using Dataflow Runner, and the errors are like this when there >>> are multiple workers: >>> >>> Error message from worker: java.lang.NoClassDefFoundError: Could not >>> initialize class >>> org.apache.beam.runners.dataflow.worker.ApplianceShuffleWriter >>> >>> org.apache.beam.runners.dataflow.worker.ShuffleSink.writer(ShuffleSink.java:348) >>> >>> org.apache.beam.runners.dataflow.worker.SizeReportingSinkWrapper.writer(SizeReportingSinkWrapper.java:46) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation.initializeWriter(WriteOperation.java:71) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation.start(WriteOperation.java:78) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92) >>> >>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420) >>> >>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389) >>> >>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314) >>> >>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) >>> >>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) >>> >>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) >>> java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> java.lang.Thread.run(Thread.java:748) >>> >>> However when there is only a single worker, the error is like this: >>> >>> The job failed because a work item has failed 4 times. Look in >>> previous log entries for the cause of each one of the 4 failures. For >>> more information, see >>> https://cloud.google.com/dataflow/docs/guides/common-errors. The work >>> item was attempted on these workers: xxx Root cause: The worker lost >>> contact with the service., >>> >>> The error guided suggested upgrade machine type. >>> >>> Those errors happen when using SDK 2.34+. When I switched to SDK 2.33, >>> everything worked well without any issues. Tried SDK 2.34, 2.35 and >>> 2.36, and found all of them got the same issue. >>> >>> Context: The code simply just reads from BigQuery with a fixed table >>> of 4,034 records, does some transform, and outputs to GCS with >>> FileIO.WriteDynamic. All tests were performed using the same machine >>> type with the same worker number. >>> >>> Does anyone know if there are any breaking changes in this SDK / >>> Dataflow runner? >>> >>> Thanks so much! >>> Siyu >>> >>
Re: [Question] infer schema from proto in java?
Correct, SqlTransform works well without explicit conversion. The Beam SQL Walkthrough page was a bit misleading. It says: "Before applying a SQL query to a PCollection, the data in the collection must be in Row format" and shows examples how to achieve it. Thank you! Gyorgy On Wed, Apr 6, 2022 at 5:47 PM Brian Hulette wrote: > Thanks Reuven! > > Gyorgy - please also note that we'd like it if users didn't actually have > to interact with Rows directly. Beam should automatically convert to Row > under the hood when you apply a schema-aware transform e.g. SqlTransform or > anything in org.apache.beam.sdk.schema.transforms [1]) to PCollection. > Why is it that you need to convert to Row? > > Brian > > [1] > https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/package-summary.html > > On Tue, Apr 5, 2022 at 10:13 PM Reuven Lax wrote: > >> You can apply Convert.toRows() >> >> On Tue, Apr 5, 2022 at 10:02 PM Balogh, György >> wrote: >> >>> Hi Brian, >>> Thank you it worked, now I have a schema of my PCollection. The >>> next step is still not clear. I'd like to convert this to PCollection >>> to be able to query with SQL. The doc has an example on how to assemble the >>> row but I assume there should be a way to do this automatically. >>> Thank you, >>> Gyorgy >>> >>> On Tue, Apr 5, 2022 at 10:53 PM Brian Hulette >>> wrote: >>> Hi Gyorgy, You should be able to register ProtoMessageSchema [1] as the SchemaProvider for your protobuf type, something like: SchemaRegistry.createDefault().registerSchemaProvider(XYZ.class, new ProtoMessageSchema()) This is similar to annotating XYZ with @DefaultScema(ProtoMessageSchema.class), which of course doesn't work in this case since you don't control the class. Adding @Reuven Lax in case he has a better solution. Brian [1] https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.html On Tue, Apr 5, 2022 at 12:03 PM Balogh, György wrote: > Hi, > I'm using the java beam sdk. > I have a PCollection where XYZ is a class generated from a proto2 > file with protoc. > Is it possible to infer schema and have a PCollection from this? > Thank you, > Gyorgy > -- > > György Balogh > CEO > E gyorgy.bal...@ultinous.com > M +36 30 270 8342 <+36%2030%20270%208342> > A HU, 1117 Budapest, Budafoki út 209. > W www.ultinous.com > >>> >>> -- >>> >>> György Balogh >>> CEO >>> E gyorgy.bal...@ultinous.com >>> M +36 30 270 8342 <+36%2030%20270%208342> >>> A HU, 1117 Budapest, Budafoki út 209. >>> W www.ultinous.com >>> >> -- György Balogh CEO E gyorgy.bal...@ultinous.com M +36 30 270 8342 <+36%2030%20270%208342> A HU, 1117 Budapest, Budafoki út 209. W www.ultinous.com
Go SDK - Docker image for Dataflow
Hi, Im wondering if it is possible to create custom docker image with compiled go binary that can be deployed to Dataflow? Currently Im using it as below myjob --runner dataflow --output gs://prog-test-bucket/out --project *** --region europe-west1 --temp_location gs://prog-test-bucket/tmp/ --staging_location gs://prog-tet-buckt/binaries/ --worker_harness_container_image=apache/beam_go_sdk:latest But this is not a clean way of starting jobs. Id prefer to have it in a more organized way with deploying worker as built container image. Can I make a use of hub.docker.com https://hub.docker.com/r/apache/beam_go_sdk and build custom image with my binary and then deploy it in GCP? I will appreciate all the suggestion/help. Thanks -- Paweł
[Question] proper type for beam udf for complex types
Hi, I'd like to use BeamSqlUdf-s with complex (potential nested) types. It is not clear how to specify the type to achieve this. Eg.: I've a rectangle with x,y,width,height and want to operate on this. I'm getting this error: No match found for function signature iou() -> [Help 1] So I'd like to have this type: RecordType(INTEGER x, INTEGER y, INTEGER width, INTEGER height)>, how to specify this type in my UDF? Thank you, Gyorgy -- György Balogh CEO E gyorgy.bal...@ultinous.com M +36 30 270 8342 <+36%2030%20270%208342> A HU, 1117 Budapest, Budafoki út 209. W www.ultinous.com
Breaking change for FileIO WriteDynamic in Beam 2.34?
Hi Beam community, We have a batch pipeline which does not run regularly. Recently we have upgraded to Beam 2.36 and this broke the FileIO WriteDynamic process. We are using Dataflow Runner, and the errors are like this when there are multiple workers: Error message from worker: java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.runners.dataflow.worker.ApplianceShuffleWriter org.apache.beam.runners.dataflow.worker.ShuffleSink.writer(ShuffleSink.java:348) org.apache.beam.runners.dataflow.worker.SizeReportingSinkWrapper.writer(SizeReportingSinkWrapper.java:46) org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation.initializeWriter(WriteOperation.java:71) org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation.start(WriteOperation.java:78) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.util.concurrent.FutureTask.run(FutureTask.java:266) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) However when there is only a single worker, the error is like this: The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: xxx Root cause: The worker lost contact with the service., The error guided suggested upgrade machine type. Those errors happen when using SDK 2.34+. When I switched to SDK 2.33, everything worked well without any issues. Tried SDK 2.34, 2.35 and 2.36, and found all of them got the same issue. Context: The code simply just reads from BigQuery with a fixed table of 4,034 records, does some transform, and outputs to GCS with FileIO.WriteDynamic. All tests were performed using the same machine type with the same worker number. Does anyone know if there are any breaking changes in this SDK / Dataflow runner? Thanks so much! Siyu