Hmm I don't see anything obviously wrong with the code, assuming TableReference and TableSchema both implement a hashCode() consistent with equals() [I'm not 100% sure - could you check that?]... I'm hoping saving heap dumps will shed some light.
On Thu, May 17, 2018 at 12:25 AM Jose Ignacio Honrado Benítez < jihonra...@gmail.com> wrote: > Thanks Eugene, > > I will try "saveHeapDumpsToGcsPath" to see if there are any head dumps > that I am missing. > > I am using DynamicDestinations, indeed. You can find the code here: > https://pastebin.com/yrmXFQLH > > Note: DocumentWithSchema is a case class that contains the document JSON > along with its TableReference and TableSchema. > > Thanks! > > El lun., 14 may. 2018 a las 23:16, Eugene Kirpichov (<kirpic...@google.com>) > escribió: > >> Thanks. This definitely looks like a memory usage issue to me, I >> recommend you try running with --saveHeapDumpsToGcsPath=gs://... and see if >> workers leave any heap dumps over there - perhaps there were some OOMs you >> missed? If there are heap dumps, you can use any regular Java memory >> profiler on them. >> >> Are you using BigQuery with dynamic destinations? If so could you provide >> the code of your DynamicDestinations class? Some implementation mistakes >> there could cause memory blow-up. >> >> If that doesn't help, then you'll need to contact Google Cloud Support >> with the details of your job and they'll be able to take a deeper look >> (possibly by delegating to our engineers, but it has to be done through >> that channel). >> >> On Sun, May 13, 2018 at 8:37 AM Jose Ignacio Honrado Benítez < >> jihonra...@gmail.com> wrote: >> >>> Thanks for your response Eugene. >>> >>> I don't see any more error traces than the shown in the previous mail in >>> the"worker" category, but checking the "jvm-gc" logs category I can see a >>> lot of these traces: >>> >>> I 5498.913: [GC (Allocation Failure) 2018-05-09T17:42:59.661+0000: >>> 5498.913: [DefNew: 708352K->708352K(796864K), 0.0000218 >>> secs]2018-05-09T17:42:59.661+0000: 5498.913: [Tenured: >>> 1501362K->1501296K(1770880K), 0.2897763 secs] 2209714K->1501296K(2567744K), >>> [Metaspace: 55314K->55314K(1099776K)], 0.2899388 secs] [Times: user=0.28 >>> sys=0.00, real=0.29 secs] >>> I 5194.235: [GC (Allocation Failure) 2018-05-09T17:42:59.771+0000: >>> 5194.235: [DefNew: 708352K->708352K(796864K), 0.0000178 >>> secs]2018-05-09T17:42:59.771+0000: 5194.235: [Tenured: >>> 1500881K->1500654K(1770880K), 0.2393131 secs] 2209233K->1500654K(2567744K), >>> [Metaspace: 55285K->55285K(1099776K)], 0.2394395 secs] [Times: user=0.24 >>> sys=0.00, real=0.23 secs] >>> I 5139.192: [GC (Allocation Failure) 2018-05-09T17:42:59.822+0000: >>> 5139.192: [DefNew: 708352K->708352K(796864K), 0.0000217 >>> secs]2018-05-09T17:42:59.822+0000: 5139.192: [Tenured: >>> 1498930K->1498951K(1770880K), 0.2942997 secs] 2207282K->1498951K(2567744K), >>> [Metaspace: 55159K->55159K(1099776K)], 0.2944375 secs] [Times: user=0.24 >>> sys=0.00, real=0.30 secs] >>> I 5500.783: [GC (Allocation Failure) 2018-05-09T17:42:59.898+0000: >>> 5500.784: [DefNew: 708352K->708352K(796864K), 0.0000236 >>> secs]2018-05-09T17:42:59.898+0000: 5500.784: [Tenured: >>> 1497597K->1497532K(1770880K), 0.2757528 secs] 2205949K->1497532K(2567744K), >>> [Metaspace: 55396K->55396K(1099776K)], 0.2759211 secs] [Times: user=0.27 >>> sys=0.00, real=0.28 secs] >>> I 5498.810: [GC (Allocation Failure) 2018-05-09T17:42:59.925+0000: >>> 5498.810: [DefNew: 708373K->105K(796864K), 0.0033594 secs] >>> 2210244K->1501976K(2567744K), 0.0034659 secs] [Times: user=0.00 sys=0.00, >>> real=0.00 secs] >>> I 5588.759: [GC (Allocation Failure) 2018-05-09T17:43:00.157+0000: >>> 5588.759: [DefNew: 708352K->708352K(796864K), 0.0000197 >>> secs]2018-05-09T17:43:00.157+0000: 5588.759: [Tenured: >>> 1499557K->1499664K(1770880K), 0.2670111 secs] 2207909K->1499664K(2567744K), >>> [Metaspace: 55453K->55453K(1099776K)], 0.2671502 secs] [Times: user=0.27 >>> sys=0.00, real=0.27 secs] >>> I 5196.180: [GC (Allocation Failure) 2018-05-09T17:43:00.406+0000: >>> 5196.180: [DefNew: 708352K->708352K(796864K), 0.0000222 >>> secs]2018-05-09T17:43:00.406+0000: 5196.180: [Tenured: >>> 1497267K->1497287K(1770880K), 0.3027764 secs] 2205619K->1497287K(2567744K), >>> [Metaspace: 55551K->55551K(1099776K)], 0.3029513 secs] [Times: user=0.30 >>> sys=0.00, real=0.30 secs] >>> I 5497.697: [GC (Allocation Failure) 2018-05-09T17:43:00.423+0000: >>> 5497.697: [DefNew: 708352K->708352K(796864K), 0.0000236 >>> secs]2018-05-09T17:43:00.423+0000: 5497.697: [Tenured: >>> 1498944K->1499048K(1770880K), 0.2855379 secs] 2207296K->1499048K(2567744K), >>> [Metaspace: 55191K->55191K(1099776K)], 0.2856974 secs] [Times: user=0.28 >>> sys=0.00, real=0.29 secs] >>> I 5198.002: [GC (Allocation Failure) 2018-05-09T17:43:00.459+0000: >>> 5198.002: [DefNew: 708352K->708352K(796864K), 0.0000302 >>> secs]2018-05-09T17:43:00.459+0000: 5198.002: [Tenured: >>> 1500191K->1500212K(1770880K), 0.6133605 secs] 2208543K->1500212K(2567744K), >>> [Metaspace: 55221K->55221K(1099776K)], 0.6135733 secs] [Times: user=0.30 >>> sys=0.00, real=0.61 secs] >>> >>> I have been using default machines (1 CPU), which seemed to perform >>> better (I suppose it is so IO intensive that more cores are not really an >>> advantage). After your response about possible OOM errors / GC trashing I >>> tried to use bigger machines, including "n1-standard-16" and even "highmem" >>> ones, but I got the same results (the job fails) and same GC traces: >>> >>> I 8614.584: [GC (Allocation Failure) [PSYoungGen: >>> 5802484K->1351K(6474240K)] 7381198K->1580064K(9390592K), 0.0221600 secs] >>> [Times: user=0.11 sys=0.00, real=0.02 secs] >>> I 7884.633: [GC (Allocation Failure) [PSYoungGen: >>> 5756774K->1078K(6446080K)] 7291919K->1536222K(9606656K), 0.0151960 secs] >>> [Times: user=0.06 sys=0.00, real=0.01 secs] >>> I 8621.025: [GC (Allocation Failure) [PSYoungGen: >>> 7205575K->1076K(7205888K)] 10217436K->3012945K(12062208K), 0.0327636 secs] >>> [Times: user=0.14 sys=0.00, real=0.03 secs] >>> I 8626.593: [GC (Allocation Failure) [PSYoungGen: >>> 7205606K->1319K(7205888K)] 10245689K->3041403K(11931136K), 0.0189765 secs] >>> [Times: user=0.06 sys=0.00, real=0.02 secs] >>> I 8627.637: [GC (Allocation Failure) [PSYoungGen: >>> 5932871K->1125K(6520320K)] 7511584K->1579839K(9436672K), 0.0328444 secs] >>> [Times: user=0.11 sys=0.00, real=0.03 secs] >>> I 7897.210: [GC (Allocation Failure) [PSYoungGen: >>> 5871670K->1365K(6493184K)] 7406814K->1536510K(9653760K), 0.0190718 secs] >>> [Times: user=0.06 sys=0.00, real=0.02 secs] >>> I 8631.514: [GC (Allocation Failure) [PSYoungGen: >>> 7205428K->1092K(7205888K)] 10217297K->3012961K(12062208K), 0.0182693 secs] >>> [Times: user=0.06 sys=0.00, real=0.02 secs] >>> I 8640.762: [GC (Allocation Failure) [PSYoungGen: >>> 5932645K->1238K(6541824K)] 7511359K->1579951K(9458176K), 0.0189788 secs] >>> [Times: user=0.05 sys=0.00, real=0.02 secs] >>> I 7910.384: [GC (Allocation Failure) [PSYoungGen: >>> 5871957K->1398K(6505472K)] 7407102K->1536542K(9666048K), 0.0104685 secs] >>> [Times: user=0.03 sys=0.00, real=0.01 secs] >>> I 8641.725: [GC (Allocation Failure) [PSYoungGen: >>> 7205444K->1158K(7205888K)] 10217313K->3013035K(12062208K), 0.0207742 secs] >>> [Times: user=0.06 sys=0.00, real=0.02 secs] >>> I 8643.024: [GC (Allocation Failure) [PSYoungGen: >>> 7205671K->1256K(7205888K)] 10245755K->3041348K(11931136K), 0.0193853 secs] >>> [Times: user=0.06 sys=0.00, real=0.02 secs] >>> I 8651.814: [GC (Allocation Failure) [PSYoungGen: >>> 7205510K->1141K(7205888K)] 10217387K->3013026K(12062208K), 0.0251953 secs] >>> [Times: user=0.05 sys=0.01, real=0.02 secs] >>> I 8653.762: [GC (Allocation Failure) [PSYoungGen: >>> 6063830K->1286K(6586880K)] 7642543K->1579999K(9503232K), 0.0356122 secs] >>> [Times: user=0.12 sys=0.00, real=0.04 secs] >>> >>> After this, I remember that we tuned the "gcsUploadBufferSizeBytes" >>> property for another Dataflow job in the past, so I passed >>> "--gcsUploadBufferSizeBytes=4194304" and as result the job raised no errors >>> and completed successfully but it took so long to finish. It took more than >>> 4 hours for a little bit more than 4 million of input documents that >>> exploded to 120 million, while the read rate of the input step was >>> constantly decreasing (it started with a good speed then it was decreasing >>> al the time, so it processed the major part of the input documents in, I >>> would say, an hour, and the rest in the remaining time). >>> >>> Is there anything we can do to improve this weird behaviour? >>> >>> Cheers >>> >>> 2018-05-11 22:22 GMT+02:00 Eugene Kirpichov <kirpic...@google.com>: >>> >>>> Do you see any other exceptions in Stackdriver, e.g. (guess) OOM >>>> errors? "worker lost contact with the service" is commonly caused by OOM >>>> crashes, and GC thrashing could also cause network issues of the sort that >>>> you're seeing. >>>> >>>> On Fri, May 11, 2018 at 2:54 AM Jose Ignacio Honrado Benítez < >>>> jihonra...@gmail.com> wrote: >>>> >>>>> Hi there, >>>>> >>>>> I'm running a pipeline in Google Cloud Dataflow that reads from GCS >>>>> and writes into several BQ tables. This pipeline has been successfully >>>>> used >>>>> to load several TB of data from GCS to BQ. >>>>> >>>>> In one particular execution, I am loading documents from GCS that are >>>>> "exploded" into several documents (they are multiplied by between 1 and 25 >>>>> depending on the document type) in one of the steps to insert them into >>>>> several BQ tables. Example: I have one document of "type a" that is >>>>> exploded into three documents: "type a.1", "type a.2" and "type a.3." >>>>> >>>>> I also have performed successful executions of this kind several >>>>> times, but in this specific case I am getting the following errors: >>>>> >>>>> java.io.IOException: Failed to close some writers at >>>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.finishBundle(WriteBundlesToFiles.java:245) >>>>> Suppressed: java.io.IOException: java.io.IOException: insufficient data >>>>> written at >>>>> com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:431) >>>>> at >>>>> com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:289) >>>>> at >>>>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.close(TableRowWriter.java:81) >>>>> at >>>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.finishBundle(WriteBundlesToFiles.java:239) >>>>> at >>>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeFinishBundle(Unknown >>>>> Source) at >>>>> org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:187) >>>>> at >>>>> com.google.cloud.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:405) >>>>> at >>>>> com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:55) >>>>> at >>>>> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) >>>>> at >>>>> com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:383) >>>>> at >>>>> com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:355) >>>>> at >>>>> com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:286) >>>>> at >>>>> com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134) >>>>> at >>>>> com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114) >>>>> at >>>>> com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: >>>>> insufficient data written at >>>>> sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.close(HttpURLConnection.java:3501) >>>>> at >>>>> com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:81) >>>>> at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981) at >>>>> com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545) >>>>> at >>>>> com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562) >>>>> at >>>>> com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419) >>>>> at >>>>> com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) >>>>> at >>>>> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427) >>>>> at >>>>> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) >>>>> at >>>>> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) >>>>> at >>>>> com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357) >>>>> ... 4 more >>>>> >>>>> and at the end the job fails with: >>>>> >>>>> Workflow failed. Causes: ..., A work item was attempted 4 times >>>>> without success. Each time the worker eventually lost contact with the >>>>> service. The work item was attempted on: >>>>> xxx-d-05090856-5x3e-harness-86r6, >>>>> xxx-d-05090856-5x3e-harness-f296, >>>>> xxx-d-05090856-5x3e-harness-x5tz, >>>>> xxx-d-05090856-5x3e-harness-g362 >>>>> >>>>> The only difference between this execution that failed and the >>>>> previous ones that succeeded is that the former has a lot more input >>>>> documents than the latter, but I don't understand why that would be an >>>>> issue, as I have executed jobs with much more input in terms of GBs. >>>>> >>>>> I found this topic in StackOverflow: >>>>> https://stackoverflow.com/questions/48285232/dataflow-batch-job-fails-with-failed-to-close-some-writers >>>>> but the proposed solution does not work for me, as I am setting up to 180 >>>>> workers in maxNumWorkers argument. >>>>> >>>>> Any idea of what could be happening? >>>>> >>>>> If it helps, this is the job id on Google's >>>>> Dataflow: 2018-05-11_01_09_21-2559209214303012602 >>>>> >>>>> Cheers >>>>> >>>> >>>