[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707067#comment-17707067
 ] 

ASF GitHub Bot commented on MAPREDUCE-7435:
-------------------------------------------

cnauroth commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1153857994


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestCommitterSupport.java:
##########
@@ -224,6 +231,23 @@ public static ManifestSuccessData createManifestOutcome(
     return outcome;
   }
 
+  /**
+   * Add heap information to IOStatisticSetters gauges, with a stage in front 
of every key.
+   * @param ioStatisticsSetters map to update
+   * @param stage stage
+   */
+  public static void addHeapInformation(IOStatisticsSetters 
ioStatisticsSetters,
+      String stage) {
+    // force a gc. bit of bad form but it makes for better numbers
+    System.gc();

Review Comment:
   This triggered a Spotbugs warning. Do think the forced GC should go behind a 
config flag, default off, and turned on in the tests?



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSnapshot.java:
##########
@@ -222,6 +223,33 @@ public synchronized Map<String, MeanStatistic> 
meanStatistics() {
     return meanStatistics;
   }
 
+  @Override
+  public synchronized void setCounter(final String key, final long value) {
+    counters().put(key, value);
+  }
+
+  @Override
+  public synchronized void setGauge(final String key, final long value) {
+    gauges().put(key, value);
+
+  }
+
+  @Override
+  public synchronized void setMaximum(final String key, final long value) {
+    maximums().put(key, value);
+
+  }
+
+  @Override
+  public synchronized void setMinimum(final String key, final long value) {
+    minimums().put(key, value);
+  }
+
+  @Override
+  public void setMeanStatistic(final String key, final MeanStatistic value) {
+

Review Comment:
   `meanStatistics().put(key, value);`?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java:
##########
@@ -63,6 +81,10 @@ public void setup() throws Exception {
         .isGreaterThan(0);
   }
 
+  public long heapSize() {
+       return Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();

Review Comment:
   Nitpick: some indentation issues here.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java:
##########
@@ -134,8 +169,26 @@ public void testSaveThenLoadManyManifests() throws 
Throwable {
 
     // and skipping the rename stage (which is going to fail),
     // go straight to cleanup
-    new CleanupJobStage(getJobStageConfig()).apply(
+    new CleanupJobStage(stageConfig).apply(
         new CleanupJobStage.Arguments("", true, true, false));
+    addHeapInformation(heapInfo, "cleanup");
+
+    ManifestSuccessData success = createManifestOutcome(stageConfig, 
OP_STAGE_JOB_COMMIT);
+    success.snapshotIOStatistics(getStageStatistics());
+    success.getIOStatistics().aggregate(heapInfo);
+
+    Configuration conf = getConfiguration();
+    enableManifestCommitter(conf);
+    String reportDir = conf.getTrimmed(OPT_SUMMARY_REPORT_DIR, "");
+    Path reportDirPath = new Path(reportDir);
+    Path path = new Path(reportDirPath,
+        createJobSummaryFilename("TestLoadManifestsStage"));
+    final FileSystem summaryFS = path.getFileSystem(conf);
+    success.save(summaryFS, path, true);
+    LOG.info("Saved summary to {}", path);
+    ManifestPrinter showManifest = new ManifestPrinter();
+        ManifestSuccessData manifestSuccessData =

Review Comment:
   Nitpick: some indentation issues here.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java:
##########
@@ -756,66 +756,75 @@ private void testConcurrentCommitTaskWithSubDir(int 
version)
     conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
         version);
 
-    conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
+    final String fileImpl = "fs.file.impl";
+    final String fileImplClassname = "org.apache.hadoop.fs.LocalFileSystem";
+    conf.setClass(fileImpl, RLFS.class, FileSystem.class);
     FileSystem.closeAll();
 
-    final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
-    final FileOutputCommitter amCommitter =
-        new FileOutputCommitter(outDir, jContext);
-    amCommitter.setupJob(jContext);
-
-    final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
-    taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
-    taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
-
-    final TextOutputFormat[] tof = new TextOutputFormat[2];
-    for (int i = 0; i < tof.length; i++) {
-      tof[i] = new TextOutputFormat() {
-        @Override
-        public Path getDefaultWorkFile(TaskAttemptContext context,
-            String extension) throws IOException {
-          final FileOutputCommitter foc = (FileOutputCommitter)
-              getOutputCommitter(context);
-          return new Path(new Path(foc.getWorkPath(), SUB_DIR),
-              getUniqueFile(context, getOutputName(context), extension));
-        }
-      };
-    }
-
-    final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
     try {
-      for (int i = 0; i < taCtx.length; i++) {
-        final int taskIdx = i;
-        executor.submit(new Callable<Void>() {
+      final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+      final FileOutputCommitter amCommitter =
+          new FileOutputCommitter(outDir, jContext);
+      amCommitter.setupJob(jContext);
+
+      final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
+      taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
+      taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
+
+      final TextOutputFormat[] tof = new TextOutputFormat[2];
+      for (int i = 0; i < tof.length; i++) {
+        tof[i] = new TextOutputFormat() {
           @Override
-          public Void call() throws IOException, InterruptedException {
-            final OutputCommitter outputCommitter =
-                tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
-            outputCommitter.setupTask(taCtx[taskIdx]);
-            final RecordWriter rw =
-                tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
-            writeOutput(rw, taCtx[taskIdx]);
-            outputCommitter.commitTask(taCtx[taskIdx]);
-            return null;
+          public Path getDefaultWorkFile(TaskAttemptContext context,
+              String extension) throws IOException {
+            final FileOutputCommitter foc = (FileOutputCommitter)
+                getOutputCommitter(context);
+            return new Path(new Path(foc.getWorkPath(), SUB_DIR),
+                getUniqueFile(context, getOutputName(context), extension));
           }
-        });
+        };
       }
-    } finally {
-      executor.shutdown();
-      while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
-        LOG.info("Awaiting thread termination!");
+
+      final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
+      try {
+        for (int i = 0; i < taCtx.length; i++) {
+          final int taskIdx = i;
+          executor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws IOException, InterruptedException {
+              final OutputCommitter outputCommitter =
+                  tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
+              outputCommitter.setupTask(taCtx[taskIdx]);
+              final RecordWriter rw =
+                  tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
+              writeOutput(rw, taCtx[taskIdx]);
+              outputCommitter.commitTask(taCtx[taskIdx]);
+              return null;
+            }
+          });
+        }
+      } finally {
+        executor.shutdown();
+        while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+          LOG.info("Awaiting thread termination!");
+        }
       }
-    }
 
-    amCommitter.commitJob(jContext);
-    final RawLocalFileSystem lfs = new RawLocalFileSystem();
-    lfs.setConf(conf);
-    assertFalse("Must not end up with sub_dir/sub_dir",
-        lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
+      amCommitter.commitJob(jContext);
+      final RawLocalFileSystem lfs = new RawLocalFileSystem();
+      lfs.setConf(conf);
+      assertFalse("Must not end up with sub_dir/sub_dir",
+          lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
 
-    // validate output
-    validateContent(OUT_SUB_DIR);
-    FileUtil.fullyDelete(new File(outDir.toString()));
+      // validate output
+      validateContent(OUT_SUB_DIR);
+      FileUtil.fullyDelete(new File(outDir.toString()));
+    } finally {
+      // needed to avoid this test contaminating others in the same JVM
+      FileSystem.closeAll();
+      conf.set(fileImpl, fileImplClassname);
+      conf.set(fileImpl, fileImplClassname);

Review Comment:
   Duplicated line?
   
   I wasn't sure why we need to set the conf here in the finally block. Did 
something mutate it after line 761, and now we need to restore it?





> ManifestCommitter OOM on azure job
> ----------------------------------
>
>                 Key: MAPREDUCE-7435
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: client
>    Affects Versions: 3.3.5
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> I've got some reports of spark jobs OOM if the manifest committer is used 
> through abfs.
> either the manifests are using too much memory, or something is not working 
> with azure stream memory use (or both).
> before proposing a solution, first step should be to write a test to load 
> many, many manifests, each with lots of dirs and files to see what breaks.
> note: we did have OOM issues with the s3a committer, on teragen but those 
> structures have to include every etag of every block, so the manifest size is 
> O(blocks); the new committer is O(files + dirs).
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314)
> at 
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
> at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656)
> at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085)
> at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> at 
> org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164)
> at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown
>  Source)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown
>  Source)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown
>  Source)
> at 
> org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410)
> at 
> org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown
>  Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to