[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715683#comment-17715683
 ] 

ASF GitHub Bot commented on MAPREDUCE-7435:
-------------------------------------------

steveloughran commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1175038824


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntryFileIO.class);
+
+  /** Configuration used to load filesystems. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   * @param conf Configuration used to load filesystems
+   */
+  public EntryFileIO(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a writer to a local file.
+   * @param file file
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(File file) throws IOException {
+    return createWriter(toPath(file));
+  }
+
+  /**
+   * Create a writer to a file on any FS.
+   * @param path path to write to.
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(Path path) throws IOException {
+    return SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(FileEntry.class));
+  }
+
+
+  /**
+   * Reader is created with sequential reads.
+   * @param file file
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(File file) throws IOException {
+    return createReader(toPath(file));
+  }
+
+  /**
+   * Reader is created with sequential reads.
+   * @param path path
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(Path path) throws IOException {
+    return new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(path));
+  }
+
+  /**
+   * Iterator to retrieve file entries from the sequence file.
+   * Closeable: cast and invoke to close the reader.
+   * @param reader reader;
+   * @return iterator
+   */
+  public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+    return new EntryIterator(reader);
+  }
+
+  /**
+   * Create and start an entry writer.
+   * @param writer writer
+   * @param capacity queue capacity
+   * @return the writer.
+   */
+  public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int 
capacity) {
+    final EntryWriter ew = new EntryWriter(writer, capacity);
+    ew.start();
+    return ew;
+  }
+
+  /**
+   * Write a sequence of entries to the writer.
+   * @param writer writer
+   * @param entries entries
+   * @param close close the stream afterwards
+   * @return number of entries written
+   * @throws IOException write failure.
+   */
+  public static int write(SequenceFile.Writer writer,
+      Collection<FileEntry> entries,
+      boolean close)
+      throws IOException {
+    try {
+      for (FileEntry entry : entries) {
+        writer.append(NullWritable.get(), entry);
+      }
+      writer.flush();
+    } finally {
+      if (close) {
+        writer.close();
+      }
+    }
+    return entries.size();
+  }
+
+
+  /**
+   * Given a file, create a Path.
+   * @param file file
+   * @return path to the file
+   */
+  public static Path toPath(final File file) {
+    return new Path(file.toURI());
+  }
+
+
+  /**
+   * Actions in the queue.
+   */
+  private enum Actions {
+    /** Write the supplied list of entries. */
+    write,
+    /** Stop the processor thread. */
+    stop
+  }
+
+  /**
+   * What gets queued: an action and a list of entries.
+   */
+  private static final class QueueEntry {
+
+    private final Actions action;
+
+    private final List<FileEntry> entries;
+
+    private QueueEntry(final Actions action, List<FileEntry> entries) {
+      this.action = action;
+      this.entries = entries;
+    }
+
+    private QueueEntry(final Actions action) {
+      this(action, null);
+    }
+  }
+
+  /**
+   * A Writer thread takes reads from a queue containing
+   * list of entries to save; these are serialized via the writer to
+   * the output stream.
+   * Other threads can queue the file entry lists from loaded manifests
+   * for them to be written.
+   * The these threads will be blocked when the queue capacity is reached.
+   * This is quite a complex process, with the main troublespots in the code
+   * being:
+   * - managing the shutdown
+   * - failing safely on write failures, restarting all blocked writers in the 
process
+   */
+  public static final class EntryWriter implements Closeable {
+
+    /**
+     * The destination of the output.
+     */
+    private final SequenceFile.Writer writer;
+
+    /**
+     * Blocking queue of actions.
+     */
+    private final BlockingQueue<QueueEntry> queue;
+
+    /**
+     * stop flag.
+     */
+    private final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /**
+     * Is the processor thread active.
+     */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    /**
+     * Executor of writes.
+     */
+    private ExecutorService executor;
+
+    /**
+     * Future invoked.
+     */
+    private Future<Integer> future;
+
+    /**
+     * count of file entries saved; only updated in one thread
+     * so volatile.
+     */
+    private final AtomicInteger count = new AtomicInteger();
+
+    /**
+     * Any failure caught on the writer thread; this should be
+     * raised within the task/job thread as it implies that the
+     * entire write has failed.
+     */
+    private final AtomicReference<IOException> failure = new 
AtomicReference<>();
+
+    /**
+     * Create.
+     * @param writer writer
+     * @param capacity capacity.
+     */
+    private EntryWriter(SequenceFile.Writer writer, int capacity) {
+      checkState(capacity > 0, "invalid queue capacity %s", capacity);
+      this.writer = requireNonNull(writer);
+      this.queue = new ArrayBlockingQueue<>(capacity);
+    }
+
+    /**
+     * Is the writer active?
+     * @return true if the processor thread is live
+     */
+    public boolean isActive() {
+      return active.get();
+    }
+
+    /**
+     * Get count of files processed.
+     * @return the count
+     */
+    public int getCount() {
+      return count.get();
+    }
+
+    /**
+     * Any failure.
+     * @return any IOException caught when writing the output
+     */
+    public IOException getFailure() {
+      return failure.get();
+    }
+
+    /**
+     * Start the thread.
+     */
+    private void start() {
+      checkState(executor == null, "already started");
+      active.set(true);
+      executor = HadoopExecutors.newSingleThreadExecutor();
+      future = executor.submit(this::processor);
+      LOG.debug("Started entry writer {}", this);
+    }
+
+    /**
+     * Add a list of entries to the queue.
+     * @param entries entries.
+     * @return whether the queue worked.
+     */
+    public boolean enqueue(List<FileEntry> entries) {
+      if (entries.isEmpty()) {
+        LOG.debug("ignoring enqueue of empty list");
+        // exit fast, but return true.
+        return true;
+      }
+      if (active.get()) {
+        try {
+          queue.put(new QueueEntry(Actions.write, entries));
+          LOG.debug("Queued {}", entries.size());
+          return true;
+        } catch (InterruptedException e) {
+          Thread.interrupted();
+          return false;
+        }
+      } else {
+        LOG.debug("Queue inactive; discarding {} entries", entries.size());
+        return false;
+      }
+    }
+
+    /**
+     * Queue and process entries until done.
+     * @return count of entries written.
+     * @throws UncheckedIOException on write failure
+     */
+    private int processor() {
+      Thread.currentThread().setName("EntryIOWriter");
+      try {
+        while (!stop.get()) {
+          final QueueEntry queueEntry = queue.take();
+          switch (queueEntry.action) {
+
+          case stop:  // stop the operation
+            LOG.debug("Stop processing");
+            stop.set(true);
+            break;
+
+          case write:  // write data
+          default:  // here to shut compiler up
+            // write
+            final List<FileEntry> entries = queueEntry.entries;
+            LOG.debug("Adding block of {} entries", entries.size());
+            for (FileEntry entry : entries) {
+              append(entry);
+            }
+            break;
+          }
+        }
+      } catch (IOException e) {
+        LOG.debug("Write failure", e);
+        failure.set(e);
+        throw new UncheckedIOException(e);
+      } catch (InterruptedException e) {
+        // being stopped implicitly
+        LOG.debug("interrupted", e);
+      } finally {
+        stop.set(true);
+        active.set(false);
+        // clear the queue, so wake up on any failure mode.
+        queue.clear();
+      }
+      return count.get();
+    }
+
+    /**
+     * write one entry.
+     * @param entry entry to write
+     * @throws IOException on write failure
+     */
+    private void append(FileEntry entry) throws IOException {
+      writer.append(NullWritable.get(), entry);
+
+      final int c = count.incrementAndGet();
+      LOG.trace("Added entry #{}: {}", c, entry);
+    }
+
+    /**
+     * Close: stop accepting new writes, wait for queued writes to complete.
+     * @throws IOException failure closing that writer, or somehow the future
+     * raises an IOE which isn't caught for later.
+     */
+    @Override
+    public void close() throws IOException {
+
+      // declare as inactive.
+      // this stops queueing more data, but leaves
+      // the worker thread still polling and writing.
+      if (!active.getAndSet(false)) {
+        // already stopped
+        return;
+      }
+      LOG.debug("Shutting down writer");
+      // signal queue closure by queuing a stop option.
+      // this is added at the end of the list of queued blocks,
+      // of which are written.
+      try {
+        queue.put(new QueueEntry(Actions.stop));
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+      }
+      try {
+        // wait for the op to finish.
+        int total = FutureIO.awaitFuture(future, 30, TimeUnit.SECONDS);

Review Comment:
   good number? make a static final declaration, if not config option?





> ManifestCommitter OOM on azure job
> ----------------------------------
>
>                 Key: MAPREDUCE-7435
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: client
>    Affects Versions: 3.3.5
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> I've got some reports of spark jobs OOM if the manifest committer is used 
> through abfs.
> either the manifests are using too much memory, or something is not working 
> with azure stream memory use (or both).
> before proposing a solution, first step should be to write a test to load 
> many, many manifests, each with lots of dirs and files to see what breaks.
> note: we did have OOM issues with the s3a committer, on teragen but those 
> structures have to include every etag of every block, so the manifest size is 
> O(blocks); the new committer is O(files + dirs).
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314)
> at 
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
> at 
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
> at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656)
> at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085)
> at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> at 
> org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164)
> at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown
>  Source)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown
>  Source)
> at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138)
> at 
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown
>  Source)
> at 
> org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410)
> at 
> org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown
>  Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to