[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713022#comment-17713022
 ] 

ASF GitHub Bot commented on MAPREDUCE-7435:
-------------------------------------------

steveloughran commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1168511215


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java:
##########
@@ -756,66 +756,75 @@ private void testConcurrentCommitTaskWithSubDir(int 
version)
     conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
         version);
 
-    conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
+    final String fileImpl = "fs.file.impl";
+    final String fileImplClassname = "org.apache.hadoop.fs.LocalFileSystem";
+    conf.setClass(fileImpl, RLFS.class, FileSystem.class);
     FileSystem.closeAll();
 
-    final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
-    final FileOutputCommitter amCommitter =
-        new FileOutputCommitter(outDir, jContext);
-    amCommitter.setupJob(jContext);
-
-    final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
-    taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
-    taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
-
-    final TextOutputFormat[] tof = new TextOutputFormat[2];
-    for (int i = 0; i < tof.length; i++) {
-      tof[i] = new TextOutputFormat() {
-        @Override
-        public Path getDefaultWorkFile(TaskAttemptContext context,
-            String extension) throws IOException {
-          final FileOutputCommitter foc = (FileOutputCommitter)
-              getOutputCommitter(context);
-          return new Path(new Path(foc.getWorkPath(), SUB_DIR),
-              getUniqueFile(context, getOutputName(context), extension));
-        }
-      };
-    }
-
-    final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
     try {
-      for (int i = 0; i < taCtx.length; i++) {
-        final int taskIdx = i;
-        executor.submit(new Callable<Void>() {
+      final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+      final FileOutputCommitter amCommitter =
+          new FileOutputCommitter(outDir, jContext);
+      amCommitter.setupJob(jContext);
+
+      final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
+      taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
+      taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
+
+      final TextOutputFormat[] tof = new TextOutputFormat[2];
+      for (int i = 0; i < tof.length; i++) {
+        tof[i] = new TextOutputFormat() {
           @Override
-          public Void call() throws IOException, InterruptedException {
-            final OutputCommitter outputCommitter =
-                tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
-            outputCommitter.setupTask(taCtx[taskIdx]);
-            final RecordWriter rw =
-                tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
-            writeOutput(rw, taCtx[taskIdx]);
-            outputCommitter.commitTask(taCtx[taskIdx]);
-            return null;
+          public Path getDefaultWorkFile(TaskAttemptContext context,
+              String extension) throws IOException {
+            final FileOutputCommitter foc = (FileOutputCommitter)
+                getOutputCommitter(context);
+            return new Path(new Path(foc.getWorkPath(), SUB_DIR),
+                getUniqueFile(context, getOutputName(context), extension));
           }
-        });
+        };
       }
-    } finally {
-      executor.shutdown();
-      while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
-        LOG.info("Awaiting thread termination!");
+
+      final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
+      try {
+        for (int i = 0; i < taCtx.length; i++) {
+          final int taskIdx = i;
+          executor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws IOException, InterruptedException {
+              final OutputCommitter outputCommitter =
+                  tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
+              outputCommitter.setupTask(taCtx[taskIdx]);
+              final RecordWriter rw =
+                  tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
+              writeOutput(rw, taCtx[taskIdx]);
+              outputCommitter.commitTask(taCtx[taskIdx]);
+              return null;
+            }
+          });
+        }
+      } finally {
+        executor.shutdown();
+        while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+          LOG.info("Awaiting thread termination!");
+        }
       }
-    }
 
-    amCommitter.commitJob(jContext);
-    final RawLocalFileSystem lfs = new RawLocalFileSystem();
-    lfs.setConf(conf);
-    assertFalse("Must not end up with sub_dir/sub_dir",
-        lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
+      amCommitter.commitJob(jContext);
+      final RawLocalFileSystem lfs = new RawLocalFileSystem();
+      lfs.setConf(conf);
+      assertFalse("Must not end up with sub_dir/sub_dir",
+          lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
 
-    // validate output
-    validateContent(OUT_SUB_DIR);
-    FileUtil.fullyDelete(new File(outDir.toString()));
+      // validate output
+      validateContent(OUT_SUB_DIR);
+      FileUtil.fullyDelete(new File(outDir.toString()));
+    } finally {
+      // needed to avoid this test contaminating others in the same JVM
+      FileSystem.closeAll();
+      conf.set(fileImpl, fileImplClassname);
+      conf.set(fileImpl, fileImplClassname);

Review Comment:
   duplicate. cut
   the reason it is in is not so much because of any change in the pr, as it 
surfaced a condition which is already there -this test changes the default 
"file" fs, and in some test runs that wasn't being reset, so other tests were 
failing later for no obvious reason





> ManifestCommitter OOM on azure job
> ----------------------------------
>
>                 Key: MAPREDUCE-7435
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: client
>    Affects Versions: 3.3.5
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> I've got some reports of spark jobs OOM if the manifest committer is used 
> through abfs.
> either the manifests are using too much memory, or something is not working 
> with azure stream memory use (or both).
> before proposing a solution, first step should be to write a test to load 
> many, many manifests, each with lots of dirs and files to see what breaks.
> note: we did have OOM issues with the s3a committer, on teragen but those 
> structures have to include every etag of every block, so the manifest size is 
> O(blocks); the new committer is O(files + dirs).
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314)
> at 
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
> at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656)
> at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085)
> at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> at 
> org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164)
> at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown
>  Source)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown
>  Source)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown
>  Source)
> at 
> org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410)
> at 
> org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown
>  Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to