[ https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728455#comment-17728455 ]
ASF GitHub Bot commented on MAPREDUCE-7435: ------------------------------------------- steveloughran commented on code in PR #5519: URL: https://github.com/apache/hadoop/pull/5519#discussion_r1213535965 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java: ########## @@ -0,0 +1,546 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.util.functional.FutureIO; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.util.Preconditions.checkState; + +/** + * Read or write entry file. + * This can be used to create a simple reader, or to create + * a writer queue where different threads can queue data for + * writing. + * The entry file is a SequenceFile with KV = {NullWritable, FileEntry}; + */ +public class EntryFileIO { + + private static final Logger LOG = LoggerFactory.getLogger( + EntryFileIO.class); + + /** Configuration used to load filesystems. */ + private final Configuration conf; + + /** + * Constructor. + * @param conf Configuration used to load filesystems + */ + public EntryFileIO(final Configuration conf) { + this.conf = conf; + } + + /** + * Create a writer to a local file. + * @param file file + * @return the writer + * @throws IOException failure to create the file + */ + public SequenceFile.Writer createWriter(File file) throws IOException { + return createWriter(toPath(file)); + } + + /** + * Create a writer to a file on any FS. + * @param path path to write to. + * @return the writer + * @throws IOException failure to create the file + */ + public SequenceFile.Writer createWriter(Path path) throws IOException { + return SequenceFile.createWriter(conf, + SequenceFile.Writer.file(path), + SequenceFile.Writer.keyClass(NullWritable.class), + SequenceFile.Writer.valueClass(FileEntry.class)); + } + + + /** + * Reader is created with sequential reads. + * @param file file + * @return the reader + * @throws IOException failure to open + */ + public SequenceFile.Reader createReader(File file) throws IOException { + return createReader(toPath(file)); + } + + /** + * Reader is created with sequential reads. + * @param path path + * @return the reader + * @throws IOException failure to open + */ + public SequenceFile.Reader createReader(Path path) throws IOException { + return new SequenceFile.Reader(conf, + SequenceFile.Reader.file(path)); + } + + /** + * Iterator to retrieve file entries from the sequence file. + * Closeable: cast and invoke to close the reader. + * @param reader reader; + * @return iterator + */ + public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) { + return new EntryIterator(reader); + } + + /** + * Create and start an entry writer. + * @param writer writer + * @param capacity queue capacity + * @return the writer. + */ + public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int capacity) { + final EntryWriter ew = new EntryWriter(writer, capacity); + ew.start(); + return ew; + } + + /** + * Write a sequence of entries to the writer. + * @param writer writer + * @param entries entries + * @param close close the stream afterwards + * @return number of entries written + * @throws IOException write failure. + */ + public static int write(SequenceFile.Writer writer, + Collection<FileEntry> entries, + boolean close) + throws IOException { + try { + for (FileEntry entry : entries) { + writer.append(NullWritable.get(), entry); + } + writer.flush(); + } finally { + if (close) { + writer.close(); + } + } + return entries.size(); + } + + + /** + * Given a file, create a Path. + * @param file file + * @return path to the file + */ + public static Path toPath(final File file) { + return new Path(file.toURI()); + } + + + /** + * Actions in the queue. + */ + private enum Actions { + /** Write the supplied list of entries. */ + write, + /** Stop the processor thread. */ + stop + } + + /** + * What gets queued: an action and a list of entries. + */ + private static final class QueueEntry { + + private final Actions action; + + private final List<FileEntry> entries; + + private QueueEntry(final Actions action, List<FileEntry> entries) { + this.action = action; + this.entries = entries; + } + + private QueueEntry(final Actions action) { + this(action, null); + } + } + + /** + * A Writer thread takes reads from a queue containing + * list of entries to save; these are serialized via the writer to + * the output stream. + * Other threads can queue the file entry lists from loaded manifests + * for them to be written. + * The these threads will be blocked when the queue capacity is reached. + * This is quite a complex process, with the main troublespots in the code + * being: + * - managing the shutdown + * - failing safely on write failures, restarting all blocked writers in the process + */ + public static final class EntryWriter implements Closeable { + + /** + * The destination of the output. + */ + private final SequenceFile.Writer writer; + + /** + * Blocking queue of actions. + */ + private final BlockingQueue<QueueEntry> queue; + + /** + * stop flag. + */ + private final AtomicBoolean stop = new AtomicBoolean(false); + + /** + * Is the processor thread active. + */ + private final AtomicBoolean active = new AtomicBoolean(false); + + /** + * Executor of writes. + */ + private ExecutorService executor; + + /** + * Future invoked. + */ + private Future<Integer> future; + + /** + * count of file entries saved; only updated in one thread + * so volatile. + */ + private final AtomicInteger count = new AtomicInteger(); + + /** + * Any failure caught on the writer thread; this should be + * raised within the task/job thread as it implies that the + * entire write has failed. + */ + private final AtomicReference<IOException> failure = new AtomicReference<>(); + + /** + * Create. + * @param writer writer + * @param capacity capacity. + */ + private EntryWriter(SequenceFile.Writer writer, int capacity) { + checkState(capacity > 0, "invalid queue capacity %s", capacity); + this.writer = requireNonNull(writer); + this.queue = new ArrayBlockingQueue<>(capacity); + } + + /** + * Is the writer active? + * @return true if the processor thread is live + */ + public boolean isActive() { + return active.get(); + } + + /** + * Get count of files processed. + * @return the count + */ + public int getCount() { + return count.get(); + } + + /** + * Any failure. + * @return any IOException caught when writing the output + */ + public IOException getFailure() { + return failure.get(); + } + + /** + * Start the thread. + */ + private void start() { + checkState(executor == null, "already started"); + active.set(true); + executor = HadoopExecutors.newSingleThreadExecutor(); + future = executor.submit(this::processor); + LOG.debug("Started entry writer {}", this); + } + + /** + * Add a list of entries to the queue. + * @param entries entries. + * @return whether the queue worked. + */ + public boolean enqueue(List<FileEntry> entries) { + if (entries.isEmpty()) { + LOG.debug("ignoring enqueue of empty list"); + // exit fast, but return true. + return true; + } + if (active.get()) { + try { + queue.put(new QueueEntry(Actions.write, entries)); + LOG.debug("Queued {}", entries.size()); + return true; + } catch (InterruptedException e) { + Thread.interrupted(); + return false; + } + } else { + LOG.debug("Queue inactive; discarding {} entries", entries.size()); + return false; + } + } + + /** + * Queue and process entries until done. + * @return count of entries written. + * @throws UncheckedIOException on write failure + */ + private int processor() { + Thread.currentThread().setName("EntryIOWriter"); + try { + while (!stop.get()) { + final QueueEntry queueEntry = queue.take(); Review Comment: fixed to 10 minutes, returning false. caller gets to react (which it will do by raising an IOE but giving anything raised by the writer thread priority) > ManifestCommitter OOM on azure job > ---------------------------------- > > Key: MAPREDUCE-7435 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435 > Project: Hadoop Map/Reduce > Issue Type: Bug > Components: client > Affects Versions: 3.3.5 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > > I've got some reports of spark jobs OOM if the manifest committer is used > through abfs. > either the manifests are using too much memory, or something is not working > with azure stream memory use (or both). > before proposing a solution, first step should be to write a test to load > many, many manifests, each with lots of dirs and files to see what breaks. > note: we did have OOM issues with the s3a committer, on teragen but those > structures have to include every etag of every block, so the manifest size is > O(blocks); the new committer is O(files + dirs). > {code} > java.lang.OutOfMemoryError: Java heap space > at > org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314) > at > org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267) > at java.io.DataInputStream.read(DataInputStream.java:149) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256) > at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656) > at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085) > at > com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) > at > org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164) > at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown > Source) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown > Source) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138) > at > org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown > Source) > at > org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410) > at > org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown > Source) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org