[ https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707067#comment-17707067 ]
ASF GitHub Bot commented on MAPREDUCE-7435: ------------------------------------------- cnauroth commented on code in PR #5519: URL: https://github.com/apache/hadoop/pull/5519#discussion_r1153857994 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestCommitterSupport.java: ########## @@ -224,6 +231,23 @@ public static ManifestSuccessData createManifestOutcome( return outcome; } + /** + * Add heap information to IOStatisticSetters gauges, with a stage in front of every key. + * @param ioStatisticsSetters map to update + * @param stage stage + */ + public static void addHeapInformation(IOStatisticsSetters ioStatisticsSetters, + String stage) { + // force a gc. bit of bad form but it makes for better numbers + System.gc(); Review Comment: This triggered a Spotbugs warning. Do think the forced GC should go behind a config flag, default off, and turned on in the tests? ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSnapshot.java: ########## @@ -222,6 +223,33 @@ public synchronized Map<String, MeanStatistic> meanStatistics() { return meanStatistics; } + @Override + public synchronized void setCounter(final String key, final long value) { + counters().put(key, value); + } + + @Override + public synchronized void setGauge(final String key, final long value) { + gauges().put(key, value); + + } + + @Override + public synchronized void setMaximum(final String key, final long value) { + maximums().put(key, value); + + } + + @Override + public synchronized void setMinimum(final String key, final long value) { + minimums().put(key, value); + } + + @Override + public void setMeanStatistic(final String key, final MeanStatistic value) { + Review Comment: `meanStatistics().put(key, value);`? ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java: ########## @@ -63,6 +81,10 @@ public void setup() throws Exception { .isGreaterThan(0); } + public long heapSize() { + return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); Review Comment: Nitpick: some indentation issues here. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java: ########## @@ -134,8 +169,26 @@ public void testSaveThenLoadManyManifests() throws Throwable { // and skipping the rename stage (which is going to fail), // go straight to cleanup - new CleanupJobStage(getJobStageConfig()).apply( + new CleanupJobStage(stageConfig).apply( new CleanupJobStage.Arguments("", true, true, false)); + addHeapInformation(heapInfo, "cleanup"); + + ManifestSuccessData success = createManifestOutcome(stageConfig, OP_STAGE_JOB_COMMIT); + success.snapshotIOStatistics(getStageStatistics()); + success.getIOStatistics().aggregate(heapInfo); + + Configuration conf = getConfiguration(); + enableManifestCommitter(conf); + String reportDir = conf.getTrimmed(OPT_SUMMARY_REPORT_DIR, ""); + Path reportDirPath = new Path(reportDir); + Path path = new Path(reportDirPath, + createJobSummaryFilename("TestLoadManifestsStage")); + final FileSystem summaryFS = path.getFileSystem(conf); + success.save(summaryFS, path, true); + LOG.info("Saved summary to {}", path); + ManifestPrinter showManifest = new ManifestPrinter(); + ManifestSuccessData manifestSuccessData = Review Comment: Nitpick: some indentation issues here. ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java: ########## @@ -756,66 +756,75 @@ private void testConcurrentCommitTaskWithSubDir(int version) conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); - conf.setClass("fs.file.impl", RLFS.class, FileSystem.class); + final String fileImpl = "fs.file.impl"; + final String fileImplClassname = "org.apache.hadoop.fs.LocalFileSystem"; + conf.setClass(fileImpl, RLFS.class, FileSystem.class); FileSystem.closeAll(); - final JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); - final FileOutputCommitter amCommitter = - new FileOutputCommitter(outDir, jContext); - amCommitter.setupJob(jContext); - - final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2]; - taCtx[0] = new TaskAttemptContextImpl(conf, taskID); - taCtx[1] = new TaskAttemptContextImpl(conf, taskID1); - - final TextOutputFormat[] tof = new TextOutputFormat[2]; - for (int i = 0; i < tof.length; i++) { - tof[i] = new TextOutputFormat() { - @Override - public Path getDefaultWorkFile(TaskAttemptContext context, - String extension) throws IOException { - final FileOutputCommitter foc = (FileOutputCommitter) - getOutputCommitter(context); - return new Path(new Path(foc.getWorkPath(), SUB_DIR), - getUniqueFile(context, getOutputName(context), extension)); - } - }; - } - - final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2); try { - for (int i = 0; i < taCtx.length; i++) { - final int taskIdx = i; - executor.submit(new Callable<Void>() { + final JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + final FileOutputCommitter amCommitter = + new FileOutputCommitter(outDir, jContext); + amCommitter.setupJob(jContext); + + final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2]; + taCtx[0] = new TaskAttemptContextImpl(conf, taskID); + taCtx[1] = new TaskAttemptContextImpl(conf, taskID1); + + final TextOutputFormat[] tof = new TextOutputFormat[2]; + for (int i = 0; i < tof.length; i++) { + tof[i] = new TextOutputFormat() { @Override - public Void call() throws IOException, InterruptedException { - final OutputCommitter outputCommitter = - tof[taskIdx].getOutputCommitter(taCtx[taskIdx]); - outputCommitter.setupTask(taCtx[taskIdx]); - final RecordWriter rw = - tof[taskIdx].getRecordWriter(taCtx[taskIdx]); - writeOutput(rw, taCtx[taskIdx]); - outputCommitter.commitTask(taCtx[taskIdx]); - return null; + public Path getDefaultWorkFile(TaskAttemptContext context, + String extension) throws IOException { + final FileOutputCommitter foc = (FileOutputCommitter) + getOutputCommitter(context); + return new Path(new Path(foc.getWorkPath(), SUB_DIR), + getUniqueFile(context, getOutputName(context), extension)); } - }); + }; } - } finally { - executor.shutdown(); - while (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - LOG.info("Awaiting thread termination!"); + + final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2); + try { + for (int i = 0; i < taCtx.length; i++) { + final int taskIdx = i; + executor.submit(new Callable<Void>() { + @Override + public Void call() throws IOException, InterruptedException { + final OutputCommitter outputCommitter = + tof[taskIdx].getOutputCommitter(taCtx[taskIdx]); + outputCommitter.setupTask(taCtx[taskIdx]); + final RecordWriter rw = + tof[taskIdx].getRecordWriter(taCtx[taskIdx]); + writeOutput(rw, taCtx[taskIdx]); + outputCommitter.commitTask(taCtx[taskIdx]); + return null; + } + }); + } + } finally { + executor.shutdown(); + while (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + LOG.info("Awaiting thread termination!"); + } } - } - amCommitter.commitJob(jContext); - final RawLocalFileSystem lfs = new RawLocalFileSystem(); - lfs.setConf(conf); - assertFalse("Must not end up with sub_dir/sub_dir", - lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR))); + amCommitter.commitJob(jContext); + final RawLocalFileSystem lfs = new RawLocalFileSystem(); + lfs.setConf(conf); + assertFalse("Must not end up with sub_dir/sub_dir", + lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR))); - // validate output - validateContent(OUT_SUB_DIR); - FileUtil.fullyDelete(new File(outDir.toString())); + // validate output + validateContent(OUT_SUB_DIR); + FileUtil.fullyDelete(new File(outDir.toString())); + } finally { + // needed to avoid this test contaminating others in the same JVM + FileSystem.closeAll(); + conf.set(fileImpl, fileImplClassname); + conf.set(fileImpl, fileImplClassname); Review Comment: Duplicated line? I wasn't sure why we need to set the conf here in the finally block. Did something mutate it after line 761, and now we need to restore it? > ManifestCommitter OOM on azure job > ---------------------------------- > > Key: MAPREDUCE-7435 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435 > Project: Hadoop Map/Reduce > Issue Type: Bug > Components: client > Affects Versions: 3.3.5 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > > I've got some reports of spark jobs OOM if the manifest committer is used > through abfs. > either the manifests are using too much memory, or something is not working > with azure stream memory use (or both). > before proposing a solution, first step should be to write a test to load > many, many manifests, each with lots of dirs and files to see what breaks. > note: we did have OOM issues with the s3a committer, on teragen but those > structures have to include every etag of every block, so the manifest size is > O(blocks); the new committer is O(files + dirs). > {code} > java.lang.OutOfMemoryError: Java heap space > at > org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314) > at > org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267) > at java.io.DataInputStream.read(DataInputStream.java:149) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256) > at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656) > at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085) > at > com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) > at > org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164) > at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown > Source) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown > Source) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown > Source) > at > org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410) > at > org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown > Source) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org