[ https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728456#comment-17728456 ]
ASF GitHub Bot commented on MAPREDUCE-7435: ------------------------------------------- steveloughran commented on code in PR #5519: URL: https://github.com/apache/hadoop/pull/5519#discussion_r1213540062 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/TestEntryFileIO.java: ########## @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; +import org.apache.hadoop.util.functional.TaskPool; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.util.functional.RemoteIterators.foreach; +import static org.apache.hadoop.util.functional.RemoteIterators.rangeExcludingIterator; + +/** + * Test {@link EntryFileIO}. + */ +public class TestEntryFileIO extends AbstractManifestCommitterTest { + + private static final Logger LOG = LoggerFactory.getLogger( + TestEntryFileIO.class); + + /** + * Entry to save. + */ + public static final FileEntry ENTRY = new FileEntry("source", "dest", 100, "etag"); + + /** + * Entry file instance. + */ + private EntryFileIO entryFileIO; + + /** + * Path to a test entry file. + */ + private File entryFile; + + /** + * Create an entry file during setup. + */ + @Before + public void setup() throws Exception { + entryFileIO = new EntryFileIO(new Configuration()); + createEntryFile(); + } + + /** + * Teardown deletes any entry file. + * @throws Exception on any failure + */ + @After + public void teardown() throws Exception { + Thread.currentThread().setName("teardown"); + if (getEntryFile() != null) { + getEntryFile().delete(); + } + } + + /** + * Create a temp entry file and set the entryFile field to it. + * @throws IOException creation failure + */ + private void createEntryFile() throws IOException { + setEntryFile(File.createTempFile("entry", ".seq")); + } + + /** + * reference to any temp file created. + */ + private File getEntryFile() { + return entryFile; + } + + private void setEntryFile(File entryFile) { + this.entryFile = entryFile; + } + + /** + * Create a file with one entry, then read it back + * via all the mechanisms available. + */ + @Test + public void testCreateWriteReadFileOneEntry() throws Throwable { + + final FileEntry source = ENTRY; + + // do an explicit close to help isolate any failure. + SequenceFile.Writer writer = createWriter(); + writer.append(NullWritable.get(), source); + writer.flush(); + writer.close(); + + FileEntry readBack = new FileEntry(); + try (SequenceFile.Reader reader = readEntryFile()) { + reader.next(NullWritable.get(), readBack); + } + Assertions.assertThat(readBack) + .describedAs("entry read back from sequence file") + .isEqualTo(source); + + // now use the iterator to access it. + final RemoteIterator<FileEntry> it = + iterateOverEntryFile(); + List<FileEntry> files = new ArrayList<>(); + foreach(it, files::add); + Assertions.assertThat(files) + .describedAs("iteration over the entry file") + .hasSize(1) + .element(0) + .isEqualTo(source); + final EntryFileIO.EntryIterator et = (EntryFileIO.EntryIterator) it; + Assertions.assertThat(et) + .describedAs("entry iterator %s", et) + .matches(p -> p.isClosed()) + .extracting(p -> p.getCount()) + .isEqualTo(1); + } + + /** + * Create a writer. + * @return a writer + * @throws IOException failure to create the file. + */ + private SequenceFile.Writer createWriter() throws IOException { + return entryFileIO.createWriter(getEntryFile()); + } + + /** + * Create an iterator over the records in the (non empty) entry file. + * @return an iterator over entries. + * @throws IOException failure to open the file + */ + private RemoteIterator<FileEntry> iterateOverEntryFile() throws IOException { + return entryFileIO.iterateOver(readEntryFile()); + } + + /** + * Create a reader for the (non empty) entry file. + * @return a reader. + * @throws IOException failure to open the file + */ + private SequenceFile.Reader readEntryFile() throws IOException { + assertEntryFileNonEmpty(); + + return entryFileIO.createReader(getEntryFile()); + } + + /** + * Create a file with one entry. + */ + @Test + public void testCreateEmptyFile() throws Throwable { + + final File file = getEntryFile(); + + entryFileIO.createWriter(file).close(); + + // now use the iterator to access it. + List<FileEntry> files = new ArrayList<>(); + Assertions.assertThat(foreach(iterateOverEntryFile(), files::add)) + .isEqualTo(0); Review Comment: added a description > ManifestCommitter OOM on azure job > ---------------------------------- > > Key: MAPREDUCE-7435 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435 > Project: Hadoop Map/Reduce > Issue Type: Bug > Components: client > Affects Versions: 3.3.5 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > > I've got some reports of spark jobs OOM if the manifest committer is used > through abfs. > either the manifests are using too much memory, or something is not working > with azure stream memory use (or both). > before proposing a solution, first step should be to write a test to load > many, many manifests, each with lots of dirs and files to see what breaks. > note: we did have OOM issues with the s3a committer, on teragen but those > structures have to include every etag of every block, so the manifest size is > O(blocks); the new committer is O(files + dirs). > {code} > java.lang.OutOfMemoryError: Java heap space > at > org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314) > at > org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267) > at java.io.DataInputStream.read(DataInputStream.java:149) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256) > at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656) > at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085) > at > com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) > at > org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164) > at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown > Source) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown > Source) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown > Source) > at > org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410) > at > org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown > Source) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org