[ https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713022#comment-17713022 ]
ASF GitHub Bot commented on MAPREDUCE-7435: ------------------------------------------- steveloughran commented on code in PR #5519: URL: https://github.com/apache/hadoop/pull/5519#discussion_r1168511215 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java: ########## @@ -756,66 +756,75 @@ private void testConcurrentCommitTaskWithSubDir(int version) conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); - conf.setClass("fs.file.impl", RLFS.class, FileSystem.class); + final String fileImpl = "fs.file.impl"; + final String fileImplClassname = "org.apache.hadoop.fs.LocalFileSystem"; + conf.setClass(fileImpl, RLFS.class, FileSystem.class); FileSystem.closeAll(); - final JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); - final FileOutputCommitter amCommitter = - new FileOutputCommitter(outDir, jContext); - amCommitter.setupJob(jContext); - - final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2]; - taCtx[0] = new TaskAttemptContextImpl(conf, taskID); - taCtx[1] = new TaskAttemptContextImpl(conf, taskID1); - - final TextOutputFormat[] tof = new TextOutputFormat[2]; - for (int i = 0; i < tof.length; i++) { - tof[i] = new TextOutputFormat() { - @Override - public Path getDefaultWorkFile(TaskAttemptContext context, - String extension) throws IOException { - final FileOutputCommitter foc = (FileOutputCommitter) - getOutputCommitter(context); - return new Path(new Path(foc.getWorkPath(), SUB_DIR), - getUniqueFile(context, getOutputName(context), extension)); - } - }; - } - - final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2); try { - for (int i = 0; i < taCtx.length; i++) { - final int taskIdx = i; - executor.submit(new Callable<Void>() { + final JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + final FileOutputCommitter amCommitter = + new FileOutputCommitter(outDir, jContext); + amCommitter.setupJob(jContext); + + final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2]; + taCtx[0] = new TaskAttemptContextImpl(conf, taskID); + taCtx[1] = new TaskAttemptContextImpl(conf, taskID1); + + final TextOutputFormat[] tof = new TextOutputFormat[2]; + for (int i = 0; i < tof.length; i++) { + tof[i] = new TextOutputFormat() { @Override - public Void call() throws IOException, InterruptedException { - final OutputCommitter outputCommitter = - tof[taskIdx].getOutputCommitter(taCtx[taskIdx]); - outputCommitter.setupTask(taCtx[taskIdx]); - final RecordWriter rw = - tof[taskIdx].getRecordWriter(taCtx[taskIdx]); - writeOutput(rw, taCtx[taskIdx]); - outputCommitter.commitTask(taCtx[taskIdx]); - return null; + public Path getDefaultWorkFile(TaskAttemptContext context, + String extension) throws IOException { + final FileOutputCommitter foc = (FileOutputCommitter) + getOutputCommitter(context); + return new Path(new Path(foc.getWorkPath(), SUB_DIR), + getUniqueFile(context, getOutputName(context), extension)); } - }); + }; } - } finally { - executor.shutdown(); - while (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - LOG.info("Awaiting thread termination!"); + + final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2); + try { + for (int i = 0; i < taCtx.length; i++) { + final int taskIdx = i; + executor.submit(new Callable<Void>() { + @Override + public Void call() throws IOException, InterruptedException { + final OutputCommitter outputCommitter = + tof[taskIdx].getOutputCommitter(taCtx[taskIdx]); + outputCommitter.setupTask(taCtx[taskIdx]); + final RecordWriter rw = + tof[taskIdx].getRecordWriter(taCtx[taskIdx]); + writeOutput(rw, taCtx[taskIdx]); + outputCommitter.commitTask(taCtx[taskIdx]); + return null; + } + }); + } + } finally { + executor.shutdown(); + while (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + LOG.info("Awaiting thread termination!"); + } } - } - amCommitter.commitJob(jContext); - final RawLocalFileSystem lfs = new RawLocalFileSystem(); - lfs.setConf(conf); - assertFalse("Must not end up with sub_dir/sub_dir", - lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR))); + amCommitter.commitJob(jContext); + final RawLocalFileSystem lfs = new RawLocalFileSystem(); + lfs.setConf(conf); + assertFalse("Must not end up with sub_dir/sub_dir", + lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR))); - // validate output - validateContent(OUT_SUB_DIR); - FileUtil.fullyDelete(new File(outDir.toString())); + // validate output + validateContent(OUT_SUB_DIR); + FileUtil.fullyDelete(new File(outDir.toString())); + } finally { + // needed to avoid this test contaminating others in the same JVM + FileSystem.closeAll(); + conf.set(fileImpl, fileImplClassname); + conf.set(fileImpl, fileImplClassname); Review Comment: duplicate. cut the reason it is in is not so much because of any change in the pr, as it surfaced a condition which is already there -this test changes the default "file" fs, and in some test runs that wasn't being reset, so other tests were failing later for no obvious reason > ManifestCommitter OOM on azure job > ---------------------------------- > > Key: MAPREDUCE-7435 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435 > Project: Hadoop Map/Reduce > Issue Type: Bug > Components: client > Affects Versions: 3.3.5 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > > I've got some reports of spark jobs OOM if the manifest committer is used > through abfs. > either the manifests are using too much memory, or something is not working > with azure stream memory use (or both). > before proposing a solution, first step should be to write a test to load > many, many manifests, each with lots of dirs and files to see what breaks. > note: we did have OOM issues with the s3a committer, on teragen but those > structures have to include every etag of every block, so the manifest size is > O(blocks); the new committer is O(files + dirs). > {code} > java.lang.OutOfMemoryError: Java heap space > at > org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314) > at > org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267) > at java.io.DataInputStream.read(DataInputStream.java:149) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256) > at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656) > at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085) > at > com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) > at > org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164) > at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown > Source) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown > Source) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown > Source) > at > org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410) > at > org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown > Source) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org