[
https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716673#comment-17716673
]
ASF GitHub Bot commented on MAPREDUCE-7435:
-------------------------------------------
steveloughran commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1177721536
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntryFileIO.class);
+
+ /** Configuration used to load filesystems. */
+ private final Configuration conf;
+
+ /**
+ * Constructor.
+ * @param conf Configuration used to load filesystems
+ */
+ public EntryFileIO(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Create a writer to a local file.
+ * @param file file
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(File file) throws IOException {
+ return createWriter(toPath(file));
+ }
+
+ /**
+ * Create a writer to a file on any FS.
+ * @param path path to write to.
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(Path path) throws IOException {
+ return SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(path),
+ SequenceFile.Writer.keyClass(NullWritable.class),
+ SequenceFile.Writer.valueClass(FileEntry.class));
+ }
+
+
+ /**
+ * Reader is created with sequential reads.
+ * @param file file
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(File file) throws IOException {
+ return createReader(toPath(file));
+ }
+
+ /**
+ * Reader is created with sequential reads.
+ * @param path path
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(Path path) throws IOException {
+ return new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(path));
+ }
+
+ /**
+ * Iterator to retrieve file entries from the sequence file.
+ * Closeable: cast and invoke to close the reader.
+ * @param reader reader;
+ * @return iterator
+ */
+ public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+ return new EntryIterator(reader);
+ }
+
+ /**
+ * Create and start an entry writer.
+ * @param writer writer
+ * @param capacity queue capacity
+ * @return the writer.
+ */
+ public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int
capacity) {
+ final EntryWriter ew = new EntryWriter(writer, capacity);
+ ew.start();
+ return ew;
+ }
+
+ /**
+ * Write a sequence of entries to the writer.
+ * @param writer writer
+ * @param entries entries
+ * @param close close the stream afterwards
+ * @return number of entries written
+ * @throws IOException write failure.
+ */
+ public static int write(SequenceFile.Writer writer,
+ Collection<FileEntry> entries,
+ boolean close)
+ throws IOException {
+ try {
+ for (FileEntry entry : entries) {
+ writer.append(NullWritable.get(), entry);
+ }
+ writer.flush();
+ } finally {
+ if (close) {
+ writer.close();
+ }
+ }
+ return entries.size();
+ }
+
+
+ /**
+ * Given a file, create a Path.
+ * @param file file
+ * @return path to the file
+ */
+ public static Path toPath(final File file) {
+ return new Path(file.toURI());
+ }
+
+
+ /**
+ * Actions in the queue.
+ */
+ private enum Actions {
+ /** Write the supplied list of entries. */
+ write,
+ /** Stop the processor thread. */
+ stop
+ }
+
+ /**
+ * What gets queued: an action and a list of entries.
+ */
+ private static final class QueueEntry {
+
+ private final Actions action;
+
+ private final List<FileEntry> entries;
+
+ private QueueEntry(final Actions action, List<FileEntry> entries) {
+ this.action = action;
+ this.entries = entries;
+ }
+
+ private QueueEntry(final Actions action) {
+ this(action, null);
+ }
+ }
+
+ /**
+ * A Writer thread takes reads from a queue containing
+ * list of entries to save; these are serialized via the writer to
+ * the output stream.
+ * Other threads can queue the file entry lists from loaded manifests
+ * for them to be written.
+ * The these threads will be blocked when the queue capacity is reached.
+ * This is quite a complex process, with the main troublespots in the code
+ * being:
+ * - managing the shutdown
+ * - failing safely on write failures, restarting all blocked writers in the
process
+ */
+ public static final class EntryWriter implements Closeable {
+
+ /**
+ * The destination of the output.
+ */
+ private final SequenceFile.Writer writer;
+
+ /**
+ * Blocking queue of actions.
+ */
+ private final BlockingQueue<QueueEntry> queue;
+
+ /**
+ * stop flag.
+ */
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+
+ /**
+ * Is the processor thread active.
+ */
+ private final AtomicBoolean active = new AtomicBoolean(false);
+
+ /**
+ * Executor of writes.
+ */
+ private ExecutorService executor;
+
+ /**
+ * Future invoked.
+ */
+ private Future<Integer> future;
+
+ /**
+ * count of file entries saved; only updated in one thread
+ * so volatile.
+ */
+ private final AtomicInteger count = new AtomicInteger();
+
+ /**
+ * Any failure caught on the writer thread; this should be
+ * raised within the task/job thread as it implies that the
+ * entire write has failed.
+ */
+ private final AtomicReference<IOException> failure = new
AtomicReference<>();
+
+ /**
+ * Create.
+ * @param writer writer
+ * @param capacity capacity.
+ */
+ private EntryWriter(SequenceFile.Writer writer, int capacity) {
+ checkState(capacity > 0, "invalid queue capacity %s", capacity);
+ this.writer = requireNonNull(writer);
+ this.queue = new ArrayBlockingQueue<>(capacity);
+ }
+
+ /**
+ * Is the writer active?
+ * @return true if the processor thread is live
+ */
+ public boolean isActive() {
+ return active.get();
+ }
+
+ /**
+ * Get count of files processed.
+ * @return the count
+ */
+ public int getCount() {
+ return count.get();
+ }
+
+ /**
+ * Any failure.
+ * @return any IOException caught when writing the output
+ */
+ public IOException getFailure() {
+ return failure.get();
+ }
+
+ /**
+ * Start the thread.
+ */
+ private void start() {
+ checkState(executor == null, "already started");
+ active.set(true);
+ executor = HadoopExecutors.newSingleThreadExecutor();
+ future = executor.submit(this::processor);
+ LOG.debug("Started entry writer {}", this);
+ }
+
+ /**
+ * Add a list of entries to the queue.
+ * @param entries entries.
+ * @return whether the queue worked.
+ */
+ public boolean enqueue(List<FileEntry> entries) {
+ if (entries.isEmpty()) {
+ LOG.debug("ignoring enqueue of empty list");
+ // exit fast, but return true.
+ return true;
+ }
+ if (active.get()) {
+ try {
+ queue.put(new QueueEntry(Actions.write, entries));
+ LOG.debug("Queued {}", entries.size());
+ return true;
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ return false;
+ }
+ } else {
+ LOG.debug("Queue inactive; discarding {} entries", entries.size());
+ return false;
+ }
+ }
+
+ /**
+ * Queue and process entries until done.
+ * @return count of entries written.
+ * @throws UncheckedIOException on write failure
+ */
+ private int processor() {
+ Thread.currentThread().setName("EntryIOWriter");
+ try {
+ while (!stop.get()) {
+ final QueueEntry queueEntry = queue.take();
Review Comment:
i don't know how long we should wait here? It assumes that yes, the caller
will eventually stop the write
> ManifestCommitter OOM on azure job
> ----------------------------------
>
> Key: MAPREDUCE-7435
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Components: client
> Affects Versions: 3.3.5
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
>
> I've got some reports of spark jobs OOM if the manifest committer is used
> through abfs.
> either the manifests are using too much memory, or something is not working
> with azure stream memory use (or both).
> before proposing a solution, first step should be to write a test to load
> many, many manifests, each with lots of dirs and files to see what breaks.
> note: we did have OOM issues with the s3a committer, on teragen but those
> structures have to include every etag of every block, so the manifest size is
> O(blocks); the new committer is O(files + dirs).
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314)
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
> at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656)
> at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085)
> at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> at
> org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164)
> at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown
> Source)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown
> Source)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown
> Source)
> at
> org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410)
> at
> org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown
> Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]