[
https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716155#comment-17716155
]
ASF GitHub Bot commented on MAPREDUCE-7435:
-------------------------------------------
mehakmeet commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1176027704
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java:
##########
@@ -67,98 +71,118 @@ protected CommitJobStage.Result executeStage(
getJobId(),
storeSupportsResilientCommit());
- boolean createMarker = arguments.isCreateMarker();
+ // once the manifest has been loaded, a temp file needs to be
+ // deleted; so track teh value.
Review Comment:
nit: "the"
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java:
##########
@@ -67,98 +71,118 @@ protected CommitJobStage.Result executeStage(
getJobId(),
storeSupportsResilientCommit());
- boolean createMarker = arguments.isCreateMarker();
+ // once the manifest has been loaded, a temp file needs to be
+ // deleted; so track teh value.
+ LoadedManifestData loadedManifestData = null;
+
+ try {
+ boolean createMarker = arguments.isCreateMarker();
+ IOStatisticsSnapshot heapInfo = new IOStatisticsSnapshot();
+ addHeapInformation(heapInfo, "setup");
+ // load the manifests
+ final StageConfig stageConfig = getStageConfig();
+ LoadManifestsStage.Result result = new
LoadManifestsStage(stageConfig).apply(
Review Comment:
suggestion: We can include a duration tracker to know the time taken to load
manifests in the final stats.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntryFileIO.class);
+
+ /** Configuration used to load filesystems. */
+ private final Configuration conf;
+
+ /**
+ * Constructor.
+ * @param conf Configuration used to load filesystems
+ */
+ public EntryFileIO(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Create a writer to a local file.
+ * @param file file
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(File file) throws IOException {
+ return createWriter(toPath(file));
+ }
+
+ /**
+ * Create a writer to a file on any FS.
+ * @param path path to write to.
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(Path path) throws IOException {
+ return SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(path),
+ SequenceFile.Writer.keyClass(NullWritable.class),
+ SequenceFile.Writer.valueClass(FileEntry.class));
+ }
+
+
+ /**
+ * Reader is created with sequential reads.
+ * @param file file
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(File file) throws IOException {
+ return createReader(toPath(file));
+ }
+
+ /**
+ * Reader is created with sequential reads.
+ * @param path path
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(Path path) throws IOException {
+ return new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(path));
+ }
+
+ /**
+ * Iterator to retrieve file entries from the sequence file.
+ * Closeable: cast and invoke to close the reader.
+ * @param reader reader;
+ * @return iterator
+ */
+ public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+ return new EntryIterator(reader);
+ }
+
+ /**
+ * Create and start an entry writer.
+ * @param writer writer
+ * @param capacity queue capacity
+ * @return the writer.
+ */
+ public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int
capacity) {
+ final EntryWriter ew = new EntryWriter(writer, capacity);
+ ew.start();
+ return ew;
+ }
+
+ /**
+ * Write a sequence of entries to the writer.
+ * @param writer writer
+ * @param entries entries
+ * @param close close the stream afterwards
+ * @return number of entries written
+ * @throws IOException write failure.
+ */
+ public static int write(SequenceFile.Writer writer,
+ Collection<FileEntry> entries,
+ boolean close)
+ throws IOException {
+ try {
+ for (FileEntry entry : entries) {
+ writer.append(NullWritable.get(), entry);
+ }
+ writer.flush();
+ } finally {
+ if (close) {
+ writer.close();
+ }
+ }
+ return entries.size();
+ }
+
+
+ /**
+ * Given a file, create a Path.
+ * @param file file
+ * @return path to the file
+ */
+ public static Path toPath(final File file) {
+ return new Path(file.toURI());
+ }
+
+
+ /**
+ * Actions in the queue.
+ */
+ private enum Actions {
+ /** Write the supplied list of entries. */
+ write,
+ /** Stop the processor thread. */
+ stop
+ }
+
+ /**
+ * What gets queued: an action and a list of entries.
+ */
+ private static final class QueueEntry {
+
+ private final Actions action;
+
+ private final List<FileEntry> entries;
+
+ private QueueEntry(final Actions action, List<FileEntry> entries) {
+ this.action = action;
+ this.entries = entries;
+ }
+
+ private QueueEntry(final Actions action) {
+ this(action, null);
+ }
+ }
+
+ /**
+ * A Writer thread takes reads from a queue containing
+ * list of entries to save; these are serialized via the writer to
+ * the output stream.
+ * Other threads can queue the file entry lists from loaded manifests
+ * for them to be written.
+ * The these threads will be blocked when the queue capacity is reached.
+ * This is quite a complex process, with the main troublespots in the code
+ * being:
+ * - managing the shutdown
+ * - failing safely on write failures, restarting all blocked writers in the
process
+ */
+ public static final class EntryWriter implements Closeable {
+
+ /**
+ * The destination of the output.
+ */
+ private final SequenceFile.Writer writer;
+
+ /**
+ * Blocking queue of actions.
+ */
+ private final BlockingQueue<QueueEntry> queue;
+
+ /**
+ * stop flag.
+ */
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+
+ /**
+ * Is the processor thread active.
+ */
+ private final AtomicBoolean active = new AtomicBoolean(false);
+
+ /**
+ * Executor of writes.
+ */
+ private ExecutorService executor;
+
+ /**
+ * Future invoked.
+ */
+ private Future<Integer> future;
+
+ /**
+ * count of file entries saved; only updated in one thread
+ * so volatile.
+ */
+ private final AtomicInteger count = new AtomicInteger();
+
+ /**
+ * Any failure caught on the writer thread; this should be
+ * raised within the task/job thread as it implies that the
+ * entire write has failed.
+ */
+ private final AtomicReference<IOException> failure = new
AtomicReference<>();
+
+ /**
+ * Create.
+ * @param writer writer
+ * @param capacity capacity.
+ */
+ private EntryWriter(SequenceFile.Writer writer, int capacity) {
+ checkState(capacity > 0, "invalid queue capacity %s", capacity);
+ this.writer = requireNonNull(writer);
+ this.queue = new ArrayBlockingQueue<>(capacity);
+ }
+
+ /**
+ * Is the writer active?
+ * @return true if the processor thread is live
+ */
+ public boolean isActive() {
+ return active.get();
+ }
+
+ /**
+ * Get count of files processed.
+ * @return the count
+ */
+ public int getCount() {
+ return count.get();
+ }
+
+ /**
+ * Any failure.
+ * @return any IOException caught when writing the output
+ */
+ public IOException getFailure() {
+ return failure.get();
+ }
+
+ /**
+ * Start the thread.
+ */
+ private void start() {
+ checkState(executor == null, "already started");
+ active.set(true);
+ executor = HadoopExecutors.newSingleThreadExecutor();
+ future = executor.submit(this::processor);
+ LOG.debug("Started entry writer {}", this);
+ }
+
+ /**
+ * Add a list of entries to the queue.
+ * @param entries entries.
+ * @return whether the queue worked.
+ */
+ public boolean enqueue(List<FileEntry> entries) {
+ if (entries.isEmpty()) {
+ LOG.debug("ignoring enqueue of empty list");
+ // exit fast, but return true.
+ return true;
+ }
+ if (active.get()) {
+ try {
+ queue.put(new QueueEntry(Actions.write, entries));
Review Comment:
Suggestion: We could use `queue.offer(E e, long timeout, TimeUnit unit)`,
such that we are waiting for the queue to have the capacity to add the Entry
while also having a timeout in case something goes wrong. We can catch the
interrupt and throw/swallow accordingly if it exceeds the timeout?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntryFileIO.class);
+
+ /** Configuration used to load filesystems. */
+ private final Configuration conf;
+
+ /**
+ * Constructor.
+ * @param conf Configuration used to load filesystems
+ */
+ public EntryFileIO(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Create a writer to a local file.
+ * @param file file
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(File file) throws IOException {
+ return createWriter(toPath(file));
+ }
+
+ /**
+ * Create a writer to a file on any FS.
+ * @param path path to write to.
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(Path path) throws IOException {
+ return SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(path),
+ SequenceFile.Writer.keyClass(NullWritable.class),
+ SequenceFile.Writer.valueClass(FileEntry.class));
+ }
+
+
+ /**
+ * Reader is created with sequential reads.
+ * @param file file
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(File file) throws IOException {
+ return createReader(toPath(file));
+ }
+
+ /**
+ * Reader is created with sequential reads.
+ * @param path path
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(Path path) throws IOException {
+ return new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(path));
+ }
+
+ /**
+ * Iterator to retrieve file entries from the sequence file.
+ * Closeable: cast and invoke to close the reader.
+ * @param reader reader;
+ * @return iterator
+ */
+ public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+ return new EntryIterator(reader);
+ }
+
+ /**
+ * Create and start an entry writer.
+ * @param writer writer
+ * @param capacity queue capacity
+ * @return the writer.
+ */
+ public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int
capacity) {
+ final EntryWriter ew = new EntryWriter(writer, capacity);
+ ew.start();
+ return ew;
+ }
+
+ /**
+ * Write a sequence of entries to the writer.
+ * @param writer writer
+ * @param entries entries
+ * @param close close the stream afterwards
+ * @return number of entries written
+ * @throws IOException write failure.
+ */
+ public static int write(SequenceFile.Writer writer,
+ Collection<FileEntry> entries,
+ boolean close)
+ throws IOException {
+ try {
+ for (FileEntry entry : entries) {
+ writer.append(NullWritable.get(), entry);
+ }
+ writer.flush();
+ } finally {
+ if (close) {
+ writer.close();
+ }
+ }
+ return entries.size();
+ }
+
+
+ /**
+ * Given a file, create a Path.
+ * @param file file
+ * @return path to the file
+ */
+ public static Path toPath(final File file) {
+ return new Path(file.toURI());
+ }
+
+
+ /**
+ * Actions in the queue.
+ */
+ private enum Actions {
+ /** Write the supplied list of entries. */
+ write,
+ /** Stop the processor thread. */
+ stop
+ }
+
+ /**
+ * What gets queued: an action and a list of entries.
+ */
+ private static final class QueueEntry {
+
+ private final Actions action;
+
+ private final List<FileEntry> entries;
+
+ private QueueEntry(final Actions action, List<FileEntry> entries) {
+ this.action = action;
+ this.entries = entries;
+ }
+
+ private QueueEntry(final Actions action) {
+ this(action, null);
+ }
+ }
+
+ /**
+ * A Writer thread takes reads from a queue containing
+ * list of entries to save; these are serialized via the writer to
+ * the output stream.
+ * Other threads can queue the file entry lists from loaded manifests
+ * for them to be written.
+ * The these threads will be blocked when the queue capacity is reached.
+ * This is quite a complex process, with the main troublespots in the code
+ * being:
+ * - managing the shutdown
+ * - failing safely on write failures, restarting all blocked writers in the
process
+ */
+ public static final class EntryWriter implements Closeable {
+
+ /**
+ * The destination of the output.
+ */
+ private final SequenceFile.Writer writer;
+
+ /**
+ * Blocking queue of actions.
+ */
+ private final BlockingQueue<QueueEntry> queue;
+
+ /**
+ * stop flag.
+ */
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+
+ /**
+ * Is the processor thread active.
+ */
+ private final AtomicBoolean active = new AtomicBoolean(false);
+
+ /**
+ * Executor of writes.
+ */
+ private ExecutorService executor;
+
+ /**
+ * Future invoked.
+ */
+ private Future<Integer> future;
+
+ /**
+ * count of file entries saved; only updated in one thread
+ * so volatile.
+ */
+ private final AtomicInteger count = new AtomicInteger();
+
+ /**
+ * Any failure caught on the writer thread; this should be
+ * raised within the task/job thread as it implies that the
+ * entire write has failed.
+ */
+ private final AtomicReference<IOException> failure = new
AtomicReference<>();
+
+ /**
+ * Create.
+ * @param writer writer
+ * @param capacity capacity.
+ */
+ private EntryWriter(SequenceFile.Writer writer, int capacity) {
+ checkState(capacity > 0, "invalid queue capacity %s", capacity);
+ this.writer = requireNonNull(writer);
+ this.queue = new ArrayBlockingQueue<>(capacity);
+ }
+
+ /**
+ * Is the writer active?
+ * @return true if the processor thread is live
+ */
+ public boolean isActive() {
+ return active.get();
+ }
+
+ /**
+ * Get count of files processed.
+ * @return the count
+ */
+ public int getCount() {
+ return count.get();
+ }
+
+ /**
+ * Any failure.
+ * @return any IOException caught when writing the output
+ */
+ public IOException getFailure() {
+ return failure.get();
+ }
+
+ /**
+ * Start the thread.
+ */
+ private void start() {
+ checkState(executor == null, "already started");
+ active.set(true);
+ executor = HadoopExecutors.newSingleThreadExecutor();
+ future = executor.submit(this::processor);
+ LOG.debug("Started entry writer {}", this);
+ }
+
+ /**
+ * Add a list of entries to the queue.
+ * @param entries entries.
+ * @return whether the queue worked.
+ */
+ public boolean enqueue(List<FileEntry> entries) {
+ if (entries.isEmpty()) {
+ LOG.debug("ignoring enqueue of empty list");
+ // exit fast, but return true.
+ return true;
+ }
+ if (active.get()) {
+ try {
+ queue.put(new QueueEntry(Actions.write, entries));
+ LOG.debug("Queued {}", entries.size());
+ return true;
+ } catch (InterruptedException e) {
+ Thread.interrupted();
Review Comment:
LOG something went wrong.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntryFileIO.class);
+
+ /** Configuration used to load filesystems. */
+ private final Configuration conf;
+
+ /**
+ * Constructor.
+ * @param conf Configuration used to load filesystems
+ */
+ public EntryFileIO(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Create a writer to a local file.
+ * @param file file
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(File file) throws IOException {
+ return createWriter(toPath(file));
+ }
+
+ /**
+ * Create a writer to a file on any FS.
+ * @param path path to write to.
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(Path path) throws IOException {
+ return SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(path),
+ SequenceFile.Writer.keyClass(NullWritable.class),
+ SequenceFile.Writer.valueClass(FileEntry.class));
+ }
+
+
+ /**
+ * Reader is created with sequential reads.
+ * @param file file
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(File file) throws IOException {
+ return createReader(toPath(file));
+ }
+
+ /**
+ * Reader is created with sequential reads.
+ * @param path path
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(Path path) throws IOException {
+ return new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(path));
+ }
+
+ /**
+ * Iterator to retrieve file entries from the sequence file.
+ * Closeable: cast and invoke to close the reader.
+ * @param reader reader;
+ * @return iterator
+ */
+ public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+ return new EntryIterator(reader);
+ }
+
+ /**
+ * Create and start an entry writer.
+ * @param writer writer
+ * @param capacity queue capacity
+ * @return the writer.
+ */
+ public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int
capacity) {
+ final EntryWriter ew = new EntryWriter(writer, capacity);
+ ew.start();
+ return ew;
+ }
+
+ /**
+ * Write a sequence of entries to the writer.
+ * @param writer writer
+ * @param entries entries
+ * @param close close the stream afterwards
+ * @return number of entries written
+ * @throws IOException write failure.
+ */
+ public static int write(SequenceFile.Writer writer,
+ Collection<FileEntry> entries,
+ boolean close)
+ throws IOException {
+ try {
+ for (FileEntry entry : entries) {
+ writer.append(NullWritable.get(), entry);
+ }
+ writer.flush();
+ } finally {
+ if (close) {
+ writer.close();
+ }
+ }
+ return entries.size();
+ }
+
+
+ /**
+ * Given a file, create a Path.
+ * @param file file
+ * @return path to the file
+ */
+ public static Path toPath(final File file) {
+ return new Path(file.toURI());
+ }
+
+
+ /**
+ * Actions in the queue.
+ */
+ private enum Actions {
+ /** Write the supplied list of entries. */
+ write,
+ /** Stop the processor thread. */
+ stop
+ }
+
+ /**
+ * What gets queued: an action and a list of entries.
+ */
+ private static final class QueueEntry {
+
+ private final Actions action;
+
+ private final List<FileEntry> entries;
+
+ private QueueEntry(final Actions action, List<FileEntry> entries) {
+ this.action = action;
+ this.entries = entries;
+ }
+
+ private QueueEntry(final Actions action) {
+ this(action, null);
+ }
+ }
+
+ /**
+ * A Writer thread takes reads from a queue containing
+ * list of entries to save; these are serialized via the writer to
+ * the output stream.
+ * Other threads can queue the file entry lists from loaded manifests
+ * for them to be written.
+ * The these threads will be blocked when the queue capacity is reached.
+ * This is quite a complex process, with the main troublespots in the code
+ * being:
+ * - managing the shutdown
+ * - failing safely on write failures, restarting all blocked writers in the
process
+ */
+ public static final class EntryWriter implements Closeable {
+
+ /**
+ * The destination of the output.
+ */
+ private final SequenceFile.Writer writer;
+
+ /**
+ * Blocking queue of actions.
+ */
+ private final BlockingQueue<QueueEntry> queue;
+
+ /**
+ * stop flag.
+ */
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+
+ /**
+ * Is the processor thread active.
+ */
+ private final AtomicBoolean active = new AtomicBoolean(false);
+
+ /**
+ * Executor of writes.
+ */
+ private ExecutorService executor;
+
+ /**
+ * Future invoked.
+ */
+ private Future<Integer> future;
+
+ /**
+ * count of file entries saved; only updated in one thread
+ * so volatile.
+ */
+ private final AtomicInteger count = new AtomicInteger();
+
+ /**
+ * Any failure caught on the writer thread; this should be
+ * raised within the task/job thread as it implies that the
+ * entire write has failed.
+ */
+ private final AtomicReference<IOException> failure = new
AtomicReference<>();
+
+ /**
+ * Create.
+ * @param writer writer
+ * @param capacity capacity.
+ */
+ private EntryWriter(SequenceFile.Writer writer, int capacity) {
+ checkState(capacity > 0, "invalid queue capacity %s", capacity);
+ this.writer = requireNonNull(writer);
+ this.queue = new ArrayBlockingQueue<>(capacity);
+ }
+
+ /**
+ * Is the writer active?
+ * @return true if the processor thread is live
+ */
+ public boolean isActive() {
+ return active.get();
+ }
+
+ /**
+ * Get count of files processed.
+ * @return the count
+ */
+ public int getCount() {
+ return count.get();
+ }
+
+ /**
+ * Any failure.
+ * @return any IOException caught when writing the output
+ */
+ public IOException getFailure() {
+ return failure.get();
+ }
+
+ /**
+ * Start the thread.
+ */
+ private void start() {
+ checkState(executor == null, "already started");
+ active.set(true);
+ executor = HadoopExecutors.newSingleThreadExecutor();
+ future = executor.submit(this::processor);
+ LOG.debug("Started entry writer {}", this);
+ }
+
+ /**
+ * Add a list of entries to the queue.
+ * @param entries entries.
+ * @return whether the queue worked.
+ */
+ public boolean enqueue(List<FileEntry> entries) {
+ if (entries.isEmpty()) {
+ LOG.debug("ignoring enqueue of empty list");
+ // exit fast, but return true.
+ return true;
+ }
+ if (active.get()) {
+ try {
+ queue.put(new QueueEntry(Actions.write, entries));
+ LOG.debug("Queued {}", entries.size());
Review Comment:
Some info about the entry that was queued in the LOG?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java:
##########
@@ -274,8 +270,8 @@ private void createOneDirectory(final DirEntry dirEntry)
throws IOException {
* Try to efficiently and robustly create a directory in a method which is
* expected to be executed in parallel with operations creating
* peer directories.
- * @param path path to create
- * @return true if dir created/found
+ * @param dirEntry dir to create
+ * @return Outcome
Review Comment:
nit: Better javadocs for return, "State of the directory in the dir map" or
something?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntryFileIO.class);
+
+ /** Configuration used to load filesystems. */
+ private final Configuration conf;
+
+ /**
+ * Constructor.
+ * @param conf Configuration used to load filesystems
+ */
+ public EntryFileIO(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Create a writer to a local file.
+ * @param file file
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(File file) throws IOException {
+ return createWriter(toPath(file));
+ }
+
+ /**
+ * Create a writer to a file on any FS.
+ * @param path path to write to.
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(Path path) throws IOException {
+ return SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(path),
+ SequenceFile.Writer.keyClass(NullWritable.class),
+ SequenceFile.Writer.valueClass(FileEntry.class));
+ }
+
+
+ /**
+ * Reader is created with sequential reads.
+ * @param file file
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(File file) throws IOException {
+ return createReader(toPath(file));
+ }
+
+ /**
+ * Reader is created with sequential reads.
+ * @param path path
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(Path path) throws IOException {
+ return new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(path));
+ }
+
+ /**
+ * Iterator to retrieve file entries from the sequence file.
+ * Closeable: cast and invoke to close the reader.
+ * @param reader reader;
+ * @return iterator
+ */
+ public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+ return new EntryIterator(reader);
+ }
+
+ /**
+ * Create and start an entry writer.
+ * @param writer writer
+ * @param capacity queue capacity
+ * @return the writer.
+ */
+ public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int
capacity) {
+ final EntryWriter ew = new EntryWriter(writer, capacity);
+ ew.start();
+ return ew;
+ }
+
+ /**
+ * Write a sequence of entries to the writer.
+ * @param writer writer
+ * @param entries entries
+ * @param close close the stream afterwards
+ * @return number of entries written
+ * @throws IOException write failure.
+ */
+ public static int write(SequenceFile.Writer writer,
+ Collection<FileEntry> entries,
+ boolean close)
+ throws IOException {
+ try {
+ for (FileEntry entry : entries) {
+ writer.append(NullWritable.get(), entry);
+ }
+ writer.flush();
+ } finally {
+ if (close) {
+ writer.close();
+ }
+ }
+ return entries.size();
+ }
+
+
+ /**
+ * Given a file, create a Path.
+ * @param file file
+ * @return path to the file
+ */
+ public static Path toPath(final File file) {
+ return new Path(file.toURI());
+ }
+
+
+ /**
+ * Actions in the queue.
+ */
+ private enum Actions {
+ /** Write the supplied list of entries. */
+ write,
+ /** Stop the processor thread. */
+ stop
+ }
+
+ /**
+ * What gets queued: an action and a list of entries.
+ */
+ private static final class QueueEntry {
+
+ private final Actions action;
+
+ private final List<FileEntry> entries;
+
+ private QueueEntry(final Actions action, List<FileEntry> entries) {
+ this.action = action;
+ this.entries = entries;
+ }
+
+ private QueueEntry(final Actions action) {
+ this(action, null);
+ }
+ }
+
+ /**
+ * A Writer thread takes reads from a queue containing
+ * list of entries to save; these are serialized via the writer to
+ * the output stream.
+ * Other threads can queue the file entry lists from loaded manifests
+ * for them to be written.
+ * The these threads will be blocked when the queue capacity is reached.
+ * This is quite a complex process, with the main troublespots in the code
+ * being:
+ * - managing the shutdown
+ * - failing safely on write failures, restarting all blocked writers in the
process
+ */
+ public static final class EntryWriter implements Closeable {
+
+ /**
+ * The destination of the output.
+ */
+ private final SequenceFile.Writer writer;
+
+ /**
+ * Blocking queue of actions.
+ */
+ private final BlockingQueue<QueueEntry> queue;
+
+ /**
+ * stop flag.
+ */
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+
+ /**
+ * Is the processor thread active.
+ */
+ private final AtomicBoolean active = new AtomicBoolean(false);
+
+ /**
+ * Executor of writes.
+ */
+ private ExecutorService executor;
+
+ /**
+ * Future invoked.
+ */
+ private Future<Integer> future;
+
+ /**
+ * count of file entries saved; only updated in one thread
+ * so volatile.
+ */
+ private final AtomicInteger count = new AtomicInteger();
+
+ /**
+ * Any failure caught on the writer thread; this should be
+ * raised within the task/job thread as it implies that the
+ * entire write has failed.
+ */
+ private final AtomicReference<IOException> failure = new
AtomicReference<>();
+
+ /**
+ * Create.
+ * @param writer writer
+ * @param capacity capacity.
+ */
+ private EntryWriter(SequenceFile.Writer writer, int capacity) {
+ checkState(capacity > 0, "invalid queue capacity %s", capacity);
+ this.writer = requireNonNull(writer);
+ this.queue = new ArrayBlockingQueue<>(capacity);
+ }
+
+ /**
+ * Is the writer active?
+ * @return true if the processor thread is live
+ */
+ public boolean isActive() {
+ return active.get();
+ }
+
+ /**
+ * Get count of files processed.
+ * @return the count
+ */
+ public int getCount() {
+ return count.get();
+ }
+
+ /**
+ * Any failure.
+ * @return any IOException caught when writing the output
+ */
+ public IOException getFailure() {
+ return failure.get();
+ }
+
+ /**
+ * Start the thread.
+ */
+ private void start() {
+ checkState(executor == null, "already started");
+ active.set(true);
+ executor = HadoopExecutors.newSingleThreadExecutor();
+ future = executor.submit(this::processor);
+ LOG.debug("Started entry writer {}", this);
+ }
+
+ /**
+ * Add a list of entries to the queue.
+ * @param entries entries.
+ * @return whether the queue worked.
+ */
+ public boolean enqueue(List<FileEntry> entries) {
+ if (entries.isEmpty()) {
+ LOG.debug("ignoring enqueue of empty list");
+ // exit fast, but return true.
+ return true;
+ }
+ if (active.get()) {
+ try {
+ queue.put(new QueueEntry(Actions.write, entries));
+ LOG.debug("Queued {}", entries.size());
+ return true;
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ return false;
+ }
+ } else {
+ LOG.debug("Queue inactive; discarding {} entries", entries.size());
Review Comment:
Should we make this LOG.err()? How often do you think we would be coming
here?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntryFileIO.class);
+
+ /** Configuration used to load filesystems. */
+ private final Configuration conf;
+
+ /**
+ * Constructor.
+ * @param conf Configuration used to load filesystems
+ */
+ public EntryFileIO(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Create a writer to a local file.
+ * @param file file
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(File file) throws IOException {
+ return createWriter(toPath(file));
+ }
+
+ /**
+ * Create a writer to a file on any FS.
+ * @param path path to write to.
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(Path path) throws IOException {
+ return SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(path),
+ SequenceFile.Writer.keyClass(NullWritable.class),
+ SequenceFile.Writer.valueClass(FileEntry.class));
+ }
+
+
+ /**
+ * Reader is created with sequential reads.
+ * @param file file
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(File file) throws IOException {
+ return createReader(toPath(file));
+ }
+
+ /**
+ * Reader is created with sequential reads.
+ * @param path path
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(Path path) throws IOException {
+ return new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(path));
+ }
+
+ /**
+ * Iterator to retrieve file entries from the sequence file.
+ * Closeable: cast and invoke to close the reader.
+ * @param reader reader;
+ * @return iterator
+ */
+ public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+ return new EntryIterator(reader);
+ }
+
+ /**
+ * Create and start an entry writer.
+ * @param writer writer
+ * @param capacity queue capacity
+ * @return the writer.
+ */
+ public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int
capacity) {
+ final EntryWriter ew = new EntryWriter(writer, capacity);
+ ew.start();
+ return ew;
+ }
+
+ /**
+ * Write a sequence of entries to the writer.
+ * @param writer writer
+ * @param entries entries
+ * @param close close the stream afterwards
+ * @return number of entries written
+ * @throws IOException write failure.
+ */
+ public static int write(SequenceFile.Writer writer,
+ Collection<FileEntry> entries,
+ boolean close)
+ throws IOException {
+ try {
+ for (FileEntry entry : entries) {
+ writer.append(NullWritable.get(), entry);
+ }
+ writer.flush();
+ } finally {
+ if (close) {
+ writer.close();
+ }
+ }
+ return entries.size();
+ }
+
+
+ /**
+ * Given a file, create a Path.
+ * @param file file
+ * @return path to the file
+ */
+ public static Path toPath(final File file) {
+ return new Path(file.toURI());
+ }
+
+
+ /**
+ * Actions in the queue.
+ */
+ private enum Actions {
+ /** Write the supplied list of entries. */
+ write,
+ /** Stop the processor thread. */
+ stop
+ }
+
+ /**
+ * What gets queued: an action and a list of entries.
+ */
+ private static final class QueueEntry {
+
+ private final Actions action;
+
+ private final List<FileEntry> entries;
+
+ private QueueEntry(final Actions action, List<FileEntry> entries) {
+ this.action = action;
+ this.entries = entries;
+ }
+
+ private QueueEntry(final Actions action) {
+ this(action, null);
+ }
+ }
+
+ /**
+ * A Writer thread takes reads from a queue containing
+ * list of entries to save; these are serialized via the writer to
+ * the output stream.
+ * Other threads can queue the file entry lists from loaded manifests
+ * for them to be written.
+ * The these threads will be blocked when the queue capacity is reached.
Review Comment:
typo: "Then"(First word)
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/RemoteIterators.java:
##########
@@ -191,6 +191,37 @@ public static <S> RemoteIterator<S> closingRemoteIterator(
return new CloseRemoteIterator<>(iterator, toClose);
}
+ /**
+ * Wrap an iterator with one which adds a continuation probe.
+ * This allows work to exit fast without complicated breakout logic
+ * @param iterator source
+ * @param continueWork predicate which will trigger a fast halt if it
returns false.
+ * @param <S> source type.
+ * @return a new iterator
+ */
+ public static <S> RemoteIterator<S> haltableRemoteIterator(
+ final RemoteIterator<S> iterator,
+ final CallableRaisingIOE<Boolean> continueWork) {
+ return new HaltableRemoteIterator<>(iterator, continueWork);
+ }
+
+ /**
+ * A remote iterator which simply counts up, stopping once the
+ * value is greater than the value of {@code excludedFinish}.
+ * This is primarily for tests or when submitting work into a TaskPool.
+ * equivalent to
+ * <pre>
+ * for(long l = start, l < finis; l++) yield l;
Review Comment:
typo: "l < excludedFinish"
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md:
##########
@@ -234,6 +239,36 @@ Caveats
`mapreduce.manifest.committer.io.rate` can help avoid this.
+### `mapreduce.manifest.committer.writer.queue.capacity`
+
+This is a secondary scale option.
+It controls the size of the queue for storing lists of files to rename from
+the manifests loaded from the target filesystem, manifests loaded
+from a pool of worker threads, and the single thread which saves
+the entries from each manifest to an intermediate file in the local filesystem.
+
+Once the queue is full, all manifest loading threads will block.
+
+```xml
+<property>
+ <name>mapreduce.manifest.committer.writer.queue.capacity</name>
+ <value>32</value>
+</property>
+```
+
+As the local filesystem is usually much faster to write to than any cloud
store,
+this queue size should not be a limit on manifest load performance.
+
+It can help limit the amount of memory consumed during manifest load during
+job commit.
+The maximumum number of loaded manifests will be
Review Comment:
typo: "maximum"
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntryFileIO.class);
+
+ /** Configuration used to load filesystems. */
+ private final Configuration conf;
+
+ /**
+ * Constructor.
+ * @param conf Configuration used to load filesystems
+ */
+ public EntryFileIO(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Create a writer to a local file.
+ * @param file file
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(File file) throws IOException {
+ return createWriter(toPath(file));
+ }
+
+ /**
+ * Create a writer to a file on any FS.
+ * @param path path to write to.
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(Path path) throws IOException {
+ return SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(path),
+ SequenceFile.Writer.keyClass(NullWritable.class),
+ SequenceFile.Writer.valueClass(FileEntry.class));
+ }
+
+
+ /**
+ * Reader is created with sequential reads.
+ * @param file file
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(File file) throws IOException {
+ return createReader(toPath(file));
+ }
+
+ /**
+ * Reader is created with sequential reads.
+ * @param path path
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(Path path) throws IOException {
+ return new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(path));
+ }
+
+ /**
+ * Iterator to retrieve file entries from the sequence file.
+ * Closeable: cast and invoke to close the reader.
+ * @param reader reader;
+ * @return iterator
+ */
+ public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+ return new EntryIterator(reader);
+ }
+
+ /**
+ * Create and start an entry writer.
+ * @param writer writer
+ * @param capacity queue capacity
+ * @return the writer.
+ */
+ public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int
capacity) {
+ final EntryWriter ew = new EntryWriter(writer, capacity);
+ ew.start();
+ return ew;
+ }
+
+ /**
+ * Write a sequence of entries to the writer.
+ * @param writer writer
+ * @param entries entries
+ * @param close close the stream afterwards
+ * @return number of entries written
+ * @throws IOException write failure.
+ */
+ public static int write(SequenceFile.Writer writer,
+ Collection<FileEntry> entries,
+ boolean close)
+ throws IOException {
+ try {
+ for (FileEntry entry : entries) {
+ writer.append(NullWritable.get(), entry);
+ }
+ writer.flush();
+ } finally {
+ if (close) {
+ writer.close();
+ }
+ }
+ return entries.size();
+ }
+
+
+ /**
+ * Given a file, create a Path.
+ * @param file file
+ * @return path to the file
+ */
+ public static Path toPath(final File file) {
+ return new Path(file.toURI());
+ }
+
+
+ /**
+ * Actions in the queue.
+ */
+ private enum Actions {
+ /** Write the supplied list of entries. */
+ write,
+ /** Stop the processor thread. */
+ stop
+ }
+
+ /**
+ * What gets queued: an action and a list of entries.
+ */
+ private static final class QueueEntry {
+
+ private final Actions action;
+
+ private final List<FileEntry> entries;
+
+ private QueueEntry(final Actions action, List<FileEntry> entries) {
+ this.action = action;
+ this.entries = entries;
+ }
+
+ private QueueEntry(final Actions action) {
+ this(action, null);
+ }
+ }
+
+ /**
+ * A Writer thread takes reads from a queue containing
+ * list of entries to save; these are serialized via the writer to
+ * the output stream.
+ * Other threads can queue the file entry lists from loaded manifests
+ * for them to be written.
+ * The these threads will be blocked when the queue capacity is reached.
+ * This is quite a complex process, with the main troublespots in the code
+ * being:
+ * - managing the shutdown
+ * - failing safely on write failures, restarting all blocked writers in the
process
+ */
+ public static final class EntryWriter implements Closeable {
+
+ /**
+ * The destination of the output.
+ */
+ private final SequenceFile.Writer writer;
+
+ /**
+ * Blocking queue of actions.
+ */
+ private final BlockingQueue<QueueEntry> queue;
+
+ /**
+ * stop flag.
+ */
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+
+ /**
+ * Is the processor thread active.
+ */
+ private final AtomicBoolean active = new AtomicBoolean(false);
+
+ /**
+ * Executor of writes.
+ */
+ private ExecutorService executor;
+
+ /**
+ * Future invoked.
+ */
+ private Future<Integer> future;
+
+ /**
+ * count of file entries saved; only updated in one thread
+ * so volatile.
+ */
+ private final AtomicInteger count = new AtomicInteger();
+
+ /**
+ * Any failure caught on the writer thread; this should be
+ * raised within the task/job thread as it implies that the
+ * entire write has failed.
+ */
+ private final AtomicReference<IOException> failure = new
AtomicReference<>();
+
+ /**
+ * Create.
+ * @param writer writer
+ * @param capacity capacity.
+ */
+ private EntryWriter(SequenceFile.Writer writer, int capacity) {
+ checkState(capacity > 0, "invalid queue capacity %s", capacity);
+ this.writer = requireNonNull(writer);
+ this.queue = new ArrayBlockingQueue<>(capacity);
+ }
+
+ /**
+ * Is the writer active?
+ * @return true if the processor thread is live
+ */
+ public boolean isActive() {
+ return active.get();
+ }
+
+ /**
+ * Get count of files processed.
+ * @return the count
+ */
+ public int getCount() {
+ return count.get();
+ }
+
+ /**
+ * Any failure.
+ * @return any IOException caught when writing the output
+ */
+ public IOException getFailure() {
+ return failure.get();
+ }
+
+ /**
+ * Start the thread.
+ */
+ private void start() {
+ checkState(executor == null, "already started");
+ active.set(true);
+ executor = HadoopExecutors.newSingleThreadExecutor();
+ future = executor.submit(this::processor);
+ LOG.debug("Started entry writer {}", this);
+ }
+
+ /**
+ * Add a list of entries to the queue.
+ * @param entries entries.
+ * @return whether the queue worked.
+ */
+ public boolean enqueue(List<FileEntry> entries) {
+ if (entries.isEmpty()) {
+ LOG.debug("ignoring enqueue of empty list");
+ // exit fast, but return true.
+ return true;
+ }
+ if (active.get()) {
+ try {
+ queue.put(new QueueEntry(Actions.write, entries));
+ LOG.debug("Queued {}", entries.size());
+ return true;
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ return false;
+ }
+ } else {
+ LOG.debug("Queue inactive; discarding {} entries", entries.size());
+ return false;
+ }
+ }
+
+ /**
+ * Queue and process entries until done.
+ * @return count of entries written.
+ * @throws UncheckedIOException on write failure
+ */
+ private int processor() {
+ Thread.currentThread().setName("EntryIOWriter");
+ try {
+ while (!stop.get()) {
+ final QueueEntry queueEntry = queue.take();
Review Comment:
seems like we could wait indefinitely on this.
How about `poll(long timeout, TimeUnit unit)`?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntryFileIO.class);
+
+ /** Configuration used to load filesystems. */
+ private final Configuration conf;
+
+ /**
+ * Constructor.
+ * @param conf Configuration used to load filesystems
+ */
+ public EntryFileIO(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Create a writer to a local file.
+ * @param file file
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(File file) throws IOException {
+ return createWriter(toPath(file));
+ }
+
+ /**
+ * Create a writer to a file on any FS.
+ * @param path path to write to.
+ * @return the writer
+ * @throws IOException failure to create the file
+ */
+ public SequenceFile.Writer createWriter(Path path) throws IOException {
+ return SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(path),
+ SequenceFile.Writer.keyClass(NullWritable.class),
+ SequenceFile.Writer.valueClass(FileEntry.class));
+ }
+
+
+ /**
+ * Reader is created with sequential reads.
+ * @param file file
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(File file) throws IOException {
+ return createReader(toPath(file));
+ }
+
+ /**
+ * Reader is created with sequential reads.
+ * @param path path
+ * @return the reader
+ * @throws IOException failure to open
+ */
+ public SequenceFile.Reader createReader(Path path) throws IOException {
+ return new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(path));
+ }
+
+ /**
+ * Iterator to retrieve file entries from the sequence file.
+ * Closeable: cast and invoke to close the reader.
+ * @param reader reader;
+ * @return iterator
+ */
+ public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+ return new EntryIterator(reader);
+ }
+
+ /**
+ * Create and start an entry writer.
+ * @param writer writer
+ * @param capacity queue capacity
+ * @return the writer.
+ */
+ public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int
capacity) {
+ final EntryWriter ew = new EntryWriter(writer, capacity);
+ ew.start();
+ return ew;
+ }
+
+ /**
+ * Write a sequence of entries to the writer.
+ * @param writer writer
+ * @param entries entries
+ * @param close close the stream afterwards
+ * @return number of entries written
+ * @throws IOException write failure.
+ */
+ public static int write(SequenceFile.Writer writer,
+ Collection<FileEntry> entries,
+ boolean close)
+ throws IOException {
+ try {
+ for (FileEntry entry : entries) {
+ writer.append(NullWritable.get(), entry);
+ }
+ writer.flush();
+ } finally {
+ if (close) {
+ writer.close();
+ }
+ }
+ return entries.size();
+ }
+
+
+ /**
+ * Given a file, create a Path.
+ * @param file file
+ * @return path to the file
+ */
+ public static Path toPath(final File file) {
+ return new Path(file.toURI());
+ }
+
+
+ /**
+ * Actions in the queue.
+ */
+ private enum Actions {
+ /** Write the supplied list of entries. */
+ write,
+ /** Stop the processor thread. */
+ stop
+ }
+
+ /**
+ * What gets queued: an action and a list of entries.
+ */
+ private static final class QueueEntry {
+
+ private final Actions action;
+
+ private final List<FileEntry> entries;
+
+ private QueueEntry(final Actions action, List<FileEntry> entries) {
+ this.action = action;
+ this.entries = entries;
+ }
+
+ private QueueEntry(final Actions action) {
+ this(action, null);
+ }
+ }
+
+ /**
+ * A Writer thread takes reads from a queue containing
+ * list of entries to save; these are serialized via the writer to
+ * the output stream.
+ * Other threads can queue the file entry lists from loaded manifests
+ * for them to be written.
+ * The these threads will be blocked when the queue capacity is reached.
+ * This is quite a complex process, with the main troublespots in the code
+ * being:
+ * - managing the shutdown
+ * - failing safely on write failures, restarting all blocked writers in the
process
+ */
+ public static final class EntryWriter implements Closeable {
+
+ /**
+ * The destination of the output.
+ */
+ private final SequenceFile.Writer writer;
+
+ /**
+ * Blocking queue of actions.
+ */
+ private final BlockingQueue<QueueEntry> queue;
+
+ /**
+ * stop flag.
+ */
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+
+ /**
+ * Is the processor thread active.
+ */
+ private final AtomicBoolean active = new AtomicBoolean(false);
+
+ /**
+ * Executor of writes.
+ */
+ private ExecutorService executor;
+
+ /**
+ * Future invoked.
+ */
+ private Future<Integer> future;
+
+ /**
+ * count of file entries saved; only updated in one thread
+ * so volatile.
+ */
+ private final AtomicInteger count = new AtomicInteger();
+
+ /**
+ * Any failure caught on the writer thread; this should be
+ * raised within the task/job thread as it implies that the
+ * entire write has failed.
+ */
+ private final AtomicReference<IOException> failure = new
AtomicReference<>();
+
+ /**
+ * Create.
+ * @param writer writer
+ * @param capacity capacity.
+ */
+ private EntryWriter(SequenceFile.Writer writer, int capacity) {
+ checkState(capacity > 0, "invalid queue capacity %s", capacity);
+ this.writer = requireNonNull(writer);
+ this.queue = new ArrayBlockingQueue<>(capacity);
+ }
+
+ /**
+ * Is the writer active?
+ * @return true if the processor thread is live
+ */
+ public boolean isActive() {
+ return active.get();
+ }
+
+ /**
+ * Get count of files processed.
+ * @return the count
+ */
+ public int getCount() {
+ return count.get();
+ }
+
+ /**
+ * Any failure.
+ * @return any IOException caught when writing the output
+ */
+ public IOException getFailure() {
+ return failure.get();
+ }
+
+ /**
+ * Start the thread.
+ */
+ private void start() {
+ checkState(executor == null, "already started");
+ active.set(true);
+ executor = HadoopExecutors.newSingleThreadExecutor();
+ future = executor.submit(this::processor);
+ LOG.debug("Started entry writer {}", this);
+ }
+
+ /**
+ * Add a list of entries to the queue.
+ * @param entries entries.
+ * @return whether the queue worked.
+ */
+ public boolean enqueue(List<FileEntry> entries) {
+ if (entries.isEmpty()) {
+ LOG.debug("ignoring enqueue of empty list");
+ // exit fast, but return true.
+ return true;
+ }
+ if (active.get()) {
+ try {
+ queue.put(new QueueEntry(Actions.write, entries));
+ LOG.debug("Queued {}", entries.size());
+ return true;
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ return false;
+ }
+ } else {
+ LOG.debug("Queue inactive; discarding {} entries", entries.size());
+ return false;
+ }
+ }
+
+ /**
+ * Queue and process entries until done.
+ * @return count of entries written.
+ * @throws UncheckedIOException on write failure
+ */
+ private int processor() {
+ Thread.currentThread().setName("EntryIOWriter");
+ try {
+ while (!stop.get()) {
+ final QueueEntry queueEntry = queue.take();
+ switch (queueEntry.action) {
+
+ case stop: // stop the operation
+ LOG.debug("Stop processing");
+ stop.set(true);
+ break;
+
+ case write: // write data
+ default: // here to shut compiler up
+ // write
+ final List<FileEntry> entries = queueEntry.entries;
+ LOG.debug("Adding block of {} entries", entries.size());
+ for (FileEntry entry : entries) {
+ append(entry);
+ }
+ break;
+ }
+ }
+ } catch (IOException e) {
+ LOG.debug("Write failure", e);
+ failure.set(e);
+ throw new UncheckedIOException(e);
+ } catch (InterruptedException e) {
+ // being stopped implicitly
+ LOG.debug("interrupted", e);
+ } finally {
+ stop.set(true);
+ active.set(false);
+ // clear the queue, so wake up on any failure mode.
+ queue.clear();
+ }
+ return count.get();
+ }
+
+ /**
+ * write one entry.
+ * @param entry entry to write
+ * @throws IOException on write failure
+ */
+ private void append(FileEntry entry) throws IOException {
+ writer.append(NullWritable.get(), entry);
+
+ final int c = count.incrementAndGet();
+ LOG.trace("Added entry #{}: {}", c, entry);
+ }
+
+ /**
+ * Close: stop accepting new writes, wait for queued writes to complete.
+ * @throws IOException failure closing that writer, or somehow the future
+ * raises an IOE which isn't caught for later.
+ */
+ @Override
+ public void close() throws IOException {
+
+ // declare as inactive.
+ // this stops queueing more data, but leaves
+ // the worker thread still polling and writing.
+ if (!active.getAndSet(false)) {
+ // already stopped
+ return;
+ }
+ LOG.debug("Shutting down writer");
+ // signal queue closure by queuing a stop option.
+ // this is added at the end of the list of queued blocks,
+ // of which are written.
+ try {
Review Comment:
LOG something like "Tasks left in queue = `capacity` -
`queue.remainingCapacity()`" for better logging. We could do something like
this while offering as well but seems apt for close().
> ManifestCommitter OOM on azure job
> ----------------------------------
>
> Key: MAPREDUCE-7435
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Components: client
> Affects Versions: 3.3.5
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
>
> I've got some reports of spark jobs OOM if the manifest committer is used
> through abfs.
> either the manifests are using too much memory, or something is not working
> with azure stream memory use (or both).
> before proposing a solution, first step should be to write a test to load
> many, many manifests, each with lots of dirs and files to see what breaks.
> note: we did have OOM issues with the s3a committer, on teragen but those
> structures have to include every etag of every block, so the manifest size is
> O(blocks); the new committer is O(files + dirs).
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314)
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
> at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656)
> at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085)
> at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> at
> org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164)
> at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown
> Source)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown
> Source)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown
> Source)
> at
> org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410)
> at
> org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown
> Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]