[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716671#comment-17716671
 ] 

ASF GitHub Bot commented on MAPREDUCE-7435:
-------------------------------------------

steveloughran commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1177718895


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntryFileIO.class);
+
+  /** Configuration used to load filesystems. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   * @param conf Configuration used to load filesystems
+   */
+  public EntryFileIO(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a writer to a local file.
+   * @param file file
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(File file) throws IOException {
+    return createWriter(toPath(file));
+  }
+
+  /**
+   * Create a writer to a file on any FS.
+   * @param path path to write to.
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(Path path) throws IOException {
+    return SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(FileEntry.class));
+  }
+
+
+  /**
+   * Reader is created with sequential reads.
+   * @param file file
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(File file) throws IOException {
+    return createReader(toPath(file));
+  }
+
+  /**
+   * Reader is created with sequential reads.
+   * @param path path
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(Path path) throws IOException {
+    return new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(path));
+  }
+
+  /**
+   * Iterator to retrieve file entries from the sequence file.
+   * Closeable: cast and invoke to close the reader.
+   * @param reader reader;
+   * @return iterator
+   */
+  public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+    return new EntryIterator(reader);
+  }
+
+  /**
+   * Create and start an entry writer.
+   * @param writer writer
+   * @param capacity queue capacity
+   * @return the writer.
+   */
+  public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int 
capacity) {
+    final EntryWriter ew = new EntryWriter(writer, capacity);
+    ew.start();
+    return ew;
+  }
+
+  /**
+   * Write a sequence of entries to the writer.
+   * @param writer writer
+   * @param entries entries
+   * @param close close the stream afterwards
+   * @return number of entries written
+   * @throws IOException write failure.
+   */
+  public static int write(SequenceFile.Writer writer,
+      Collection<FileEntry> entries,
+      boolean close)
+      throws IOException {
+    try {
+      for (FileEntry entry : entries) {
+        writer.append(NullWritable.get(), entry);
+      }
+      writer.flush();
+    } finally {
+      if (close) {
+        writer.close();
+      }
+    }
+    return entries.size();
+  }
+
+
+  /**
+   * Given a file, create a Path.
+   * @param file file
+   * @return path to the file
+   */
+  public static Path toPath(final File file) {
+    return new Path(file.toURI());
+  }
+
+
+  /**
+   * Actions in the queue.
+   */
+  private enum Actions {
+    /** Write the supplied list of entries. */
+    write,
+    /** Stop the processor thread. */
+    stop
+  }
+
+  /**
+   * What gets queued: an action and a list of entries.
+   */
+  private static final class QueueEntry {
+
+    private final Actions action;
+
+    private final List<FileEntry> entries;
+
+    private QueueEntry(final Actions action, List<FileEntry> entries) {
+      this.action = action;
+      this.entries = entries;
+    }
+
+    private QueueEntry(final Actions action) {
+      this(action, null);
+    }
+  }
+
+  /**
+   * A Writer thread takes reads from a queue containing
+   * list of entries to save; these are serialized via the writer to
+   * the output stream.
+   * Other threads can queue the file entry lists from loaded manifests
+   * for them to be written.
+   * The these threads will be blocked when the queue capacity is reached.
+   * This is quite a complex process, with the main troublespots in the code
+   * being:
+   * - managing the shutdown
+   * - failing safely on write failures, restarting all blocked writers in the 
process
+   */
+  public static final class EntryWriter implements Closeable {
+
+    /**
+     * The destination of the output.
+     */
+    private final SequenceFile.Writer writer;
+
+    /**
+     * Blocking queue of actions.
+     */
+    private final BlockingQueue<QueueEntry> queue;
+
+    /**
+     * stop flag.
+     */
+    private final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /**
+     * Is the processor thread active.
+     */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    /**
+     * Executor of writes.
+     */
+    private ExecutorService executor;
+
+    /**
+     * Future invoked.
+     */
+    private Future<Integer> future;
+
+    /**
+     * count of file entries saved; only updated in one thread
+     * so volatile.
+     */
+    private final AtomicInteger count = new AtomicInteger();
+
+    /**
+     * Any failure caught on the writer thread; this should be
+     * raised within the task/job thread as it implies that the
+     * entire write has failed.
+     */
+    private final AtomicReference<IOException> failure = new 
AtomicReference<>();
+
+    /**
+     * Create.
+     * @param writer writer
+     * @param capacity capacity.
+     */
+    private EntryWriter(SequenceFile.Writer writer, int capacity) {
+      checkState(capacity > 0, "invalid queue capacity %s", capacity);
+      this.writer = requireNonNull(writer);
+      this.queue = new ArrayBlockingQueue<>(capacity);
+    }
+
+    /**
+     * Is the writer active?
+     * @return true if the processor thread is live
+     */
+    public boolean isActive() {
+      return active.get();
+    }
+
+    /**
+     * Get count of files processed.
+     * @return the count
+     */
+    public int getCount() {
+      return count.get();
+    }
+
+    /**
+     * Any failure.
+     * @return any IOException caught when writing the output
+     */
+    public IOException getFailure() {
+      return failure.get();
+    }
+
+    /**
+     * Start the thread.
+     */
+    private void start() {
+      checkState(executor == null, "already started");
+      active.set(true);
+      executor = HadoopExecutors.newSingleThreadExecutor();
+      future = executor.submit(this::processor);
+      LOG.debug("Started entry writer {}", this);
+    }
+
+    /**
+     * Add a list of entries to the queue.
+     * @param entries entries.
+     * @return whether the queue worked.
+     */
+    public boolean enqueue(List<FileEntry> entries) {
+      if (entries.isEmpty()) {
+        LOG.debug("ignoring enqueue of empty list");
+        // exit fast, but return true.
+        return true;
+      }
+      if (active.get()) {
+        try {
+          queue.put(new QueueEntry(Actions.write, entries));
+          LOG.debug("Queued {}", entries.size());

Review Comment:
   it's a list...what do we want to add?





> ManifestCommitter OOM on azure job
> ----------------------------------
>
>                 Key: MAPREDUCE-7435
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: client
>    Affects Versions: 3.3.5
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> I've got some reports of spark jobs OOM if the manifest committer is used 
> through abfs.
> either the manifests are using too much memory, or something is not working 
> with azure stream memory use (or both).
> before proposing a solution, first step should be to write a test to load 
> many, many manifests, each with lots of dirs and files to see what breaks.
> note: we did have OOM issues with the s3a committer, on teragen but those 
> structures have to include every etag of every block, so the manifest size is 
> O(blocks); the new committer is O(files + dirs).
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314)
> at 
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
> at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656)
> at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085)
> at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> at 
> org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164)
> at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown
>  Source)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown
>  Source)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown
>  Source)
> at 
> org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410)
> at 
> org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown
>  Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to