Dataflow runner retries workitems (work units) up to four times. So possibly batches that failed due to contention are being successfully written during retries. Datastore sink performs dynamic throttling but workitem failure due to contention is still possible. As long as these workitems complete during subsequent retries the job should pass.
- Cham On Mon, Jan 1, 2018 at 12:22 AM Joshua Fox <[email protected]> wrote: > I am occasionally getting these. My pipeline is a simple backup, which > reads from one Datastore and writes to another. Clearly such basic > functionally should not be hitting up against Datastore's limits. > > This is especially problematic as puts are batched, so that one failure > causes the loss of 500 Entities. > > On the other hand, for some reason this did not cause the execution of the > pipeline to fail; I am not sure why. > > Ideas on what is happening here? > > (a5f346d765a99e3e): java.lang.RuntimeException: > org.apache.beam.sdk.util.UserCodeException: > com.google.datastore.v1.client.DatastoreException: too much contention on > these datastore entities. please try again., code=ABORTED at > com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182) > at > com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104) > at > com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:54) > at > com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:37) > at > com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:117) > at > com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:74) > at > com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:113) > at > com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) > at > com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) > at > com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:187) > at > com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148) > at > com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68) > at > com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:330) > at > com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:302) > at > com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:251) > at > com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) > at > com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115) > at > com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) Caused by: > org.apache.beam.sdk.util.UserCodeException: > com.google.datastore.v1.client.DatastoreException: too much contention on > these datastore entities. please try again., code=ABORTED at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) > at > org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$DatastoreWriterFn$DoFnInvoker.invokeProcessElement(Unknown > Source) at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) > at > com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324) > at > com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) > at > com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) > at > com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at > org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122) > at > org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown > Source) at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) > at > com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324) > at > com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) > at > com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) > at > com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at > com.freightos.backup.datastore.beam.EntityDoFn.processElement(EntityDoFn.java:51) > at > com.freightos.backup.datastore.beam.EntityDoFn$DoFnInvoker.invokeProcessElement(Unknown > Source) at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) > at > com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324) > at > com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) > at > com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) > at > com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at > org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read$ReadFn.processElement(DatastoreV1.java:919) > at > org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read$ReadFn$DoFnInvoker.invokeProcessElement(Unknown > Source) at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) > at > com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324) > at > com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) > at > com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) > at > com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at > org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122) > at > org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown > Source) at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) > at > com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324) > at > com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) > at > com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) > at > com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at > org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:84) > at > org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown > Source) at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) > at > com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324) > at > com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) > at > com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) > at > com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:180) > ... 21 more Caused by: com.google.datastore.v1.client.DatastoreException: > too much contention on these datastore entities. please try again., > code=ABORTED at > com.google.datastore.v1.client.RemoteRpc.makeException(RemoteRpc.java:226) > at > com.google.datastore.v1.client.RemoteRpc.makeException(RemoteRpc.java:275) > at com.google.datastore.v1.client.RemoteRpc.call(RemoteRpc.java:186) at > com.google.datastore.v1.client.Datastore.commit(Datastore.java:87) at > org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$DatastoreWriterFn.flushBatch(DatastoreV1.java:1326) > at > org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$DatastoreWriterFn.processElement(DatastoreV1.java:1284) > >
