Hmm I don't see anything obviously wrong with the code, assuming
TableReference and TableSchema both implement a hashCode() consistent with
equals() [I'm not 100% sure - could you check that?]... I'm hoping saving
heap dumps will shed some light.

On Thu, May 17, 2018 at 12:25 AM Jose Ignacio Honrado Benítez <
jihonra...@gmail.com> wrote:

> Thanks Eugene,
>
> I will try "saveHeapDumpsToGcsPath" to see if there are any head dumps
> that I am missing.
>
> I am using DynamicDestinations, indeed. You can find the code here:
> https://pastebin.com/yrmXFQLH
>
> Note: DocumentWithSchema is a case class that contains the document JSON
> along with its TableReference and TableSchema.
>
> Thanks!
>
> El lun., 14 may. 2018 a las 23:16, Eugene Kirpichov (<kirpic...@google.com>)
> escribió:
>
>> Thanks. This definitely looks like a memory usage issue to me, I
>> recommend you try running with --saveHeapDumpsToGcsPath=gs://... and see if
>> workers leave any heap dumps over there - perhaps there were some OOMs you
>> missed? If there are heap dumps, you can use any regular Java memory
>> profiler on them.
>>
>> Are you using BigQuery with dynamic destinations? If so could you provide
>> the code of your DynamicDestinations class? Some implementation mistakes
>> there could cause memory blow-up.
>>
>> If that doesn't help, then you'll need to contact Google Cloud Support
>> with the details of your job and they'll be able to take a deeper look
>> (possibly by delegating to our engineers, but it has to be done through
>> that channel).
>>
>> On Sun, May 13, 2018 at 8:37 AM Jose Ignacio Honrado Benítez <
>> jihonra...@gmail.com> wrote:
>>
>>> Thanks for your response Eugene.
>>>
>>> I don't see any more error traces than the shown in the previous mail in
>>> the"worker" category,  but checking the "jvm-gc" logs category I can see a
>>> lot of these traces:
>>>
>>> I  5498.913: [GC (Allocation Failure) 2018-05-09T17:42:59.661+0000: 
>>> 5498.913: [DefNew: 708352K->708352K(796864K), 0.0000218 
>>> secs]2018-05-09T17:42:59.661+0000: 5498.913: [Tenured: 
>>> 1501362K->1501296K(1770880K), 0.2897763 secs] 2209714K->1501296K(2567744K), 
>>> [Metaspace: 55314K->55314K(1099776K)], 0.2899388 secs] [Times: user=0.28 
>>> sys=0.00, real=0.29 secs]
>>> I  5194.235: [GC (Allocation Failure) 2018-05-09T17:42:59.771+0000: 
>>> 5194.235: [DefNew: 708352K->708352K(796864K), 0.0000178 
>>> secs]2018-05-09T17:42:59.771+0000: 5194.235: [Tenured: 
>>> 1500881K->1500654K(1770880K), 0.2393131 secs] 2209233K->1500654K(2567744K), 
>>> [Metaspace: 55285K->55285K(1099776K)], 0.2394395 secs] [Times: user=0.24 
>>> sys=0.00, real=0.23 secs]
>>> I  5139.192: [GC (Allocation Failure) 2018-05-09T17:42:59.822+0000: 
>>> 5139.192: [DefNew: 708352K->708352K(796864K), 0.0000217 
>>> secs]2018-05-09T17:42:59.822+0000: 5139.192: [Tenured: 
>>> 1498930K->1498951K(1770880K), 0.2942997 secs] 2207282K->1498951K(2567744K), 
>>> [Metaspace: 55159K->55159K(1099776K)], 0.2944375 secs] [Times: user=0.24 
>>> sys=0.00, real=0.30 secs]
>>> I  5500.783: [GC (Allocation Failure) 2018-05-09T17:42:59.898+0000: 
>>> 5500.784: [DefNew: 708352K->708352K(796864K), 0.0000236 
>>> secs]2018-05-09T17:42:59.898+0000: 5500.784: [Tenured: 
>>> 1497597K->1497532K(1770880K), 0.2757528 secs] 2205949K->1497532K(2567744K), 
>>> [Metaspace: 55396K->55396K(1099776K)], 0.2759211 secs] [Times: user=0.27 
>>> sys=0.00, real=0.28 secs]
>>> I  5498.810: [GC (Allocation Failure) 2018-05-09T17:42:59.925+0000: 
>>> 5498.810: [DefNew: 708373K->105K(796864K), 0.0033594 secs] 
>>> 2210244K->1501976K(2567744K), 0.0034659 secs] [Times: user=0.00 sys=0.00, 
>>> real=0.00 secs]
>>> I  5588.759: [GC (Allocation Failure) 2018-05-09T17:43:00.157+0000: 
>>> 5588.759: [DefNew: 708352K->708352K(796864K), 0.0000197 
>>> secs]2018-05-09T17:43:00.157+0000: 5588.759: [Tenured: 
>>> 1499557K->1499664K(1770880K), 0.2670111 secs] 2207909K->1499664K(2567744K), 
>>> [Metaspace: 55453K->55453K(1099776K)], 0.2671502 secs] [Times: user=0.27 
>>> sys=0.00, real=0.27 secs]
>>> I  5196.180: [GC (Allocation Failure) 2018-05-09T17:43:00.406+0000: 
>>> 5196.180: [DefNew: 708352K->708352K(796864K), 0.0000222 
>>> secs]2018-05-09T17:43:00.406+0000: 5196.180: [Tenured: 
>>> 1497267K->1497287K(1770880K), 0.3027764 secs] 2205619K->1497287K(2567744K), 
>>> [Metaspace: 55551K->55551K(1099776K)], 0.3029513 secs] [Times: user=0.30 
>>> sys=0.00, real=0.30 secs]
>>> I  5497.697: [GC (Allocation Failure) 2018-05-09T17:43:00.423+0000: 
>>> 5497.697: [DefNew: 708352K->708352K(796864K), 0.0000236 
>>> secs]2018-05-09T17:43:00.423+0000: 5497.697: [Tenured: 
>>> 1498944K->1499048K(1770880K), 0.2855379 secs] 2207296K->1499048K(2567744K), 
>>> [Metaspace: 55191K->55191K(1099776K)], 0.2856974 secs] [Times: user=0.28 
>>> sys=0.00, real=0.29 secs]
>>> I  5198.002: [GC (Allocation Failure) 2018-05-09T17:43:00.459+0000: 
>>> 5198.002: [DefNew: 708352K->708352K(796864K), 0.0000302 
>>> secs]2018-05-09T17:43:00.459+0000: 5198.002: [Tenured: 
>>> 1500191K->1500212K(1770880K), 0.6133605 secs] 2208543K->1500212K(2567744K), 
>>> [Metaspace: 55221K->55221K(1099776K)], 0.6135733 secs] [Times: user=0.30 
>>> sys=0.00, real=0.61 secs]
>>>
>>> I have been using default machines (1 CPU), which seemed to perform
>>> better (I suppose it is so IO intensive that more cores are not really an
>>> advantage). After your response about possible OOM errors / GC trashing I
>>> tried to use bigger machines, including "n1-standard-16" and even "highmem"
>>> ones, but I got the same results (the job fails) and same GC traces:
>>>
>>> I  8614.584: [GC (Allocation Failure) [PSYoungGen: 
>>> 5802484K->1351K(6474240K)] 7381198K->1580064K(9390592K), 0.0221600 secs] 
>>> [Times: user=0.11 sys=0.00, real=0.02 secs]
>>> I  7884.633: [GC (Allocation Failure) [PSYoungGen: 
>>> 5756774K->1078K(6446080K)] 7291919K->1536222K(9606656K), 0.0151960 secs] 
>>> [Times: user=0.06 sys=0.00, real=0.01 secs]
>>> I  8621.025: [GC (Allocation Failure) [PSYoungGen: 
>>> 7205575K->1076K(7205888K)] 10217436K->3012945K(12062208K), 0.0327636 secs] 
>>> [Times: user=0.14 sys=0.00, real=0.03 secs]
>>> I  8626.593: [GC (Allocation Failure) [PSYoungGen: 
>>> 7205606K->1319K(7205888K)] 10245689K->3041403K(11931136K), 0.0189765 secs] 
>>> [Times: user=0.06 sys=0.00, real=0.02 secs]
>>> I  8627.637: [GC (Allocation Failure) [PSYoungGen: 
>>> 5932871K->1125K(6520320K)] 7511584K->1579839K(9436672K), 0.0328444 secs] 
>>> [Times: user=0.11 sys=0.00, real=0.03 secs]
>>> I  7897.210: [GC (Allocation Failure) [PSYoungGen: 
>>> 5871670K->1365K(6493184K)] 7406814K->1536510K(9653760K), 0.0190718 secs] 
>>> [Times: user=0.06 sys=0.00, real=0.02 secs]
>>> I  8631.514: [GC (Allocation Failure) [PSYoungGen: 
>>> 7205428K->1092K(7205888K)] 10217297K->3012961K(12062208K), 0.0182693 secs] 
>>> [Times: user=0.06 sys=0.00, real=0.02 secs]
>>> I  8640.762: [GC (Allocation Failure) [PSYoungGen: 
>>> 5932645K->1238K(6541824K)] 7511359K->1579951K(9458176K), 0.0189788 secs] 
>>> [Times: user=0.05 sys=0.00, real=0.02 secs]
>>> I  7910.384: [GC (Allocation Failure) [PSYoungGen: 
>>> 5871957K->1398K(6505472K)] 7407102K->1536542K(9666048K), 0.0104685 secs] 
>>> [Times: user=0.03 sys=0.00, real=0.01 secs]
>>> I  8641.725: [GC (Allocation Failure) [PSYoungGen: 
>>> 7205444K->1158K(7205888K)] 10217313K->3013035K(12062208K), 0.0207742 secs] 
>>> [Times: user=0.06 sys=0.00, real=0.02 secs]
>>> I  8643.024: [GC (Allocation Failure) [PSYoungGen: 
>>> 7205671K->1256K(7205888K)] 10245755K->3041348K(11931136K), 0.0193853 secs] 
>>> [Times: user=0.06 sys=0.00, real=0.02 secs]
>>> I  8651.814: [GC (Allocation Failure) [PSYoungGen: 
>>> 7205510K->1141K(7205888K)] 10217387K->3013026K(12062208K), 0.0251953 secs] 
>>> [Times: user=0.05 sys=0.01, real=0.02 secs]
>>> I  8653.762: [GC (Allocation Failure) [PSYoungGen: 
>>> 6063830K->1286K(6586880K)] 7642543K->1579999K(9503232K), 0.0356122 secs] 
>>> [Times: user=0.12 sys=0.00, real=0.04 secs]
>>>
>>> After this, I remember that we tuned the "gcsUploadBufferSizeBytes"
>>> property for another Dataflow job in the past, so I passed
>>> "--gcsUploadBufferSizeBytes=4194304" and as result the job raised no errors
>>> and completed successfully but it took so long to finish. It took more than
>>> 4 hours for a little bit more than 4 million of input documents that
>>> exploded to 120 million, while the read rate of the input step was
>>> constantly decreasing (it started with a good speed then it was decreasing
>>> al the time, so it processed the major part of the input documents in, I
>>> would say, an hour, and the rest in the remaining time).
>>>
>>> Is there anything we can do to improve this weird behaviour?
>>>
>>> Cheers
>>>
>>> 2018-05-11 22:22 GMT+02:00 Eugene Kirpichov <kirpic...@google.com>:
>>>
>>>> Do you see any other exceptions in Stackdriver, e.g. (guess) OOM
>>>> errors? "worker lost contact with the service" is commonly caused by OOM
>>>> crashes, and GC thrashing could also cause network issues of the sort that
>>>> you're seeing.
>>>>
>>>> On Fri, May 11, 2018 at 2:54 AM Jose Ignacio Honrado Benítez <
>>>> jihonra...@gmail.com> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> I'm running a pipeline in Google Cloud Dataflow that reads from GCS
>>>>> and writes into several BQ tables. This pipeline has been successfully 
>>>>> used
>>>>> to load several TB of data from GCS to BQ.
>>>>>
>>>>> In one particular execution, I am loading documents from GCS that are
>>>>> "exploded" into several documents (they are multiplied by between 1 and 25
>>>>> depending on the document type) in one of the steps to insert them into
>>>>> several BQ tables. Example: I have one document of "type a" that is
>>>>> exploded into three documents: "type a.1", "type a.2" and "type a.3."
>>>>>
>>>>> I also have performed successful executions of this kind several
>>>>> times, but in this specific case I am getting the following errors:
>>>>>
>>>>> java.io.IOException: Failed to close some writers at
>>>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.finishBundle(WriteBundlesToFiles.java:245)
>>>>> Suppressed: java.io.IOException: java.io.IOException: insufficient data
>>>>> written at
>>>>> com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:431)
>>>>> at
>>>>> com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:289)
>>>>> at
>>>>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.close(TableRowWriter.java:81)
>>>>> at
>>>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.finishBundle(WriteBundlesToFiles.java:239)
>>>>> at
>>>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeFinishBundle(Unknown
>>>>> Source) at
>>>>> org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:187)
>>>>> at
>>>>> com.google.cloud.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:405)
>>>>> at
>>>>> com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:55)
>>>>> at
>>>>> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
>>>>> at
>>>>> com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:383)
>>>>> at
>>>>> com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:355)
>>>>> at
>>>>> com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:286)
>>>>> at
>>>>> com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134)
>>>>> at
>>>>> com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114)
>>>>> at
>>>>> com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101)
>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException:
>>>>> insufficient data written at
>>>>> sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.close(HttpURLConnection.java:3501)
>>>>> at
>>>>> com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:81)
>>>>> at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981) at
>>>>> com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
>>>>> at
>>>>> com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
>>>>> at
>>>>> com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
>>>>> at
>>>>> com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
>>>>> at
>>>>> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
>>>>> at
>>>>> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
>>>>> at
>>>>> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
>>>>> at
>>>>> com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
>>>>> ... 4 more
>>>>>
>>>>> and at the end the job fails with:
>>>>>
>>>>> Workflow failed. Causes: ..., A work item was attempted 4 times
>>>>> without success. Each time the worker eventually lost contact with the
>>>>> service. The work item was attempted on:
>>>>>   xxx-d-05090856-5x3e-harness-86r6,
>>>>>   xxx-d-05090856-5x3e-harness-f296,
>>>>>   xxx-d-05090856-5x3e-harness-x5tz,
>>>>>   xxx-d-05090856-5x3e-harness-g362
>>>>>
>>>>> The only difference between this execution that failed and the
>>>>> previous ones that succeeded is that the former has a lot more input
>>>>> documents than the latter, but I don't understand why that would be an
>>>>> issue, as I have executed jobs with much more input in terms of GBs.
>>>>>
>>>>> I found this topic in StackOverflow:
>>>>> https://stackoverflow.com/questions/48285232/dataflow-batch-job-fails-with-failed-to-close-some-writers
>>>>> but the proposed solution does not work for me, as I am setting up to 180
>>>>> workers in maxNumWorkers argument.
>>>>>
>>>>> Any idea of what could be happening?
>>>>>
>>>>> If it helps, this is the job id on Google's
>>>>> Dataflow: 2018-05-11_01_09_21-2559209214303012602
>>>>>
>>>>> Cheers
>>>>>
>>>>
>>>

Reply via email to