[
https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713022#comment-17713022
]
ASF GitHub Bot commented on MAPREDUCE-7435:
-------------------------------------------
steveloughran commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1168511215
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java:
##########
@@ -756,66 +756,75 @@ private void testConcurrentCommitTaskWithSubDir(int
version)
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
- conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
+ final String fileImpl = "fs.file.impl";
+ final String fileImplClassname = "org.apache.hadoop.fs.LocalFileSystem";
+ conf.setClass(fileImpl, RLFS.class, FileSystem.class);
FileSystem.closeAll();
- final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
- final FileOutputCommitter amCommitter =
- new FileOutputCommitter(outDir, jContext);
- amCommitter.setupJob(jContext);
-
- final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
- taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
- taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
-
- final TextOutputFormat[] tof = new TextOutputFormat[2];
- for (int i = 0; i < tof.length; i++) {
- tof[i] = new TextOutputFormat() {
- @Override
- public Path getDefaultWorkFile(TaskAttemptContext context,
- String extension) throws IOException {
- final FileOutputCommitter foc = (FileOutputCommitter)
- getOutputCommitter(context);
- return new Path(new Path(foc.getWorkPath(), SUB_DIR),
- getUniqueFile(context, getOutputName(context), extension));
- }
- };
- }
-
- final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
try {
- for (int i = 0; i < taCtx.length; i++) {
- final int taskIdx = i;
- executor.submit(new Callable<Void>() {
+ final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ final FileOutputCommitter amCommitter =
+ new FileOutputCommitter(outDir, jContext);
+ amCommitter.setupJob(jContext);
+
+ final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
+ taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
+ taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
+
+ final TextOutputFormat[] tof = new TextOutputFormat[2];
+ for (int i = 0; i < tof.length; i++) {
+ tof[i] = new TextOutputFormat() {
@Override
- public Void call() throws IOException, InterruptedException {
- final OutputCommitter outputCommitter =
- tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
- outputCommitter.setupTask(taCtx[taskIdx]);
- final RecordWriter rw =
- tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
- writeOutput(rw, taCtx[taskIdx]);
- outputCommitter.commitTask(taCtx[taskIdx]);
- return null;
+ public Path getDefaultWorkFile(TaskAttemptContext context,
+ String extension) throws IOException {
+ final FileOutputCommitter foc = (FileOutputCommitter)
+ getOutputCommitter(context);
+ return new Path(new Path(foc.getWorkPath(), SUB_DIR),
+ getUniqueFile(context, getOutputName(context), extension));
}
- });
+ };
}
- } finally {
- executor.shutdown();
- while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
- LOG.info("Awaiting thread termination!");
+
+ final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
+ try {
+ for (int i = 0; i < taCtx.length; i++) {
+ final int taskIdx = i;
+ executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException, InterruptedException {
+ final OutputCommitter outputCommitter =
+ tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
+ outputCommitter.setupTask(taCtx[taskIdx]);
+ final RecordWriter rw =
+ tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
+ writeOutput(rw, taCtx[taskIdx]);
+ outputCommitter.commitTask(taCtx[taskIdx]);
+ return null;
+ }
+ });
+ }
+ } finally {
+ executor.shutdown();
+ while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+ LOG.info("Awaiting thread termination!");
+ }
}
- }
- amCommitter.commitJob(jContext);
- final RawLocalFileSystem lfs = new RawLocalFileSystem();
- lfs.setConf(conf);
- assertFalse("Must not end up with sub_dir/sub_dir",
- lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
+ amCommitter.commitJob(jContext);
+ final RawLocalFileSystem lfs = new RawLocalFileSystem();
+ lfs.setConf(conf);
+ assertFalse("Must not end up with sub_dir/sub_dir",
+ lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
- // validate output
- validateContent(OUT_SUB_DIR);
- FileUtil.fullyDelete(new File(outDir.toString()));
+ // validate output
+ validateContent(OUT_SUB_DIR);
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ } finally {
+ // needed to avoid this test contaminating others in the same JVM
+ FileSystem.closeAll();
+ conf.set(fileImpl, fileImplClassname);
+ conf.set(fileImpl, fileImplClassname);
Review Comment:
duplicate. cut
the reason it is in is not so much because of any change in the pr, as it
surfaced a condition which is already there -this test changes the default
"file" fs, and in some test runs that wasn't being reset, so other tests were
failing later for no obvious reason
> ManifestCommitter OOM on azure job
> ----------------------------------
>
> Key: MAPREDUCE-7435
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Components: client
> Affects Versions: 3.3.5
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
>
> I've got some reports of spark jobs OOM if the manifest committer is used
> through abfs.
> either the manifests are using too much memory, or something is not working
> with azure stream memory use (or both).
> before proposing a solution, first step should be to write a test to load
> many, many manifests, each with lots of dirs and files to see what breaks.
> note: we did have OOM issues with the s3a committer, on teragen but those
> structures have to include every etag of every block, so the manifest size is
> O(blocks); the new committer is O(files + dirs).
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314)
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
> at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656)
> at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085)
> at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> at
> org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164)
> at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown
> Source)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown
> Source)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown
> Source)
> at
> org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410)
> at
> org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown
> Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]