Repository: hive
Updated Branches:
  refs/heads/branch-3 204a0e211 -> 36c33ca06


HIVE-20011: Move away from append mode in proto logging hook (Harish JP, 
reviewd by Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/29315fcb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/29315fcb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/29315fcb

Branch: refs/heads/branch-3
Commit: 29315fcbb53bf10af16f75ec3d36965c061eedd6
Parents: 204a0e2
Author: Anishek Agarwal <anis...@gmail.com>
Authored: Fri Jun 29 15:05:17 2018 +0530
Committer: Prasanth Jayachandran <prasan...@apache.org>
Committed: Tue Sep 18 13:19:49 2018 -0700

----------------------------------------------------------------------
 .../hive/ql/hooks/HiveProtoLoggingHook.java     | 24 +++++++++++++++++---
 .../logging/proto/DatePartitionedLogger.java    | 18 +++++++++++----
 .../logging/proto/ProtoMessageReader.java       |  9 +++++---
 .../logging/proto/ProtoMessageWriter.java       | 12 +++++-----
 4 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/29315fcb/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java 
b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 1e7070b..49cba4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -86,6 +86,7 @@ import static 
org.apache.hadoop.hive.ql.plan.HiveOperation.UNLOCKTABLE;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -101,6 +102,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.compress.utils.IOUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -180,6 +182,9 @@ public class HiveProtoLoggingHook implements 
ExecuteWithHookContext {
     private final DatePartitionedLogger<HiveHookEventProto> logger;
     private final ExecutorService eventHandler;
     private final ExecutorService logWriter;
+    private int logFileCount = 0;
+    private ProtoMessageWriter<HiveHookEventProto> writer;
+    private LocalDate writerDate;
 
     EventLogger(HiveConf conf, Clock clock) {
       this.clock = clock;
@@ -234,6 +239,7 @@ public class HiveProtoLoggingHook implements 
ExecuteWithHookContext {
           LOG.warn("Got interrupted exception while waiting for events to be 
flushed", e);
         }
       }
+      IOUtils.closeQuietly(writer);
     }
 
     void handle(HookContext hookContext) {
@@ -285,12 +291,24 @@ public class HiveProtoLoggingHook implements 
ExecuteWithHookContext {
     private static final int MAX_RETRIES = 2;
     private void writeEvent(HiveHookEventProto event) {
       for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) {
-        try (ProtoMessageWriter<HiveHookEventProto> writer = 
logger.getWriter(logFileName)) {
+        try {
+          if (writer == null || 
!logger.getNow().toLocalDate().equals(writerDate)) {
+            if (writer != null) {
+              // Day change over case, reset the logFileCount.
+              logFileCount = 0;
+              IOUtils.closeQuietly(writer);
+            }
+            // increment log file count, if creating a new writer.
+            writer = logger.getWriter(logFileName + "_" + ++logFileCount);
+            writerDate = 
logger.getDateFromDir(writer.getPath().getParent().getName());
+          }
           writer.writeProto(event);
-          // This does not work hence, opening and closing file for every 
event.
-          // writer.hflush();
+          writer.hflush();
           return;
         } catch (IOException e) {
+          // Something wrong with writer, lets close and reopen.
+          IOUtils.closeQuietly(writer);
+          writer = null;
           if (retryCount < MAX_RETRIES) {
             LOG.warn("Error writing proto message for query {}, eventType: {}, 
retryCount: {}," +
                 " error: {} ", event.getHiveQueryId(), event.getEventType(), 
retryCount,

http://git-wip-us.apache.org/repos/asf/hive/blob/29315fcb/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
 
b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
index d6a5121..58cec7e 100644
--- 
a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
+++ 
b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
@@ -45,11 +45,14 @@ import com.google.protobuf.Parser;
  * @param <T> The proto message type.
  */
 public class DatePartitionedLogger<T extends MessageLite> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(DatePartitionedLogger.class.getName());
+  private static final Logger LOG = 
LoggerFactory.getLogger(DatePartitionedLogger.class);
   // Everyone has permission to write, but with sticky set so that delete is 
restricted.
   // This is required, since the path is same for all users and everyone 
writes into it.
   private static final FsPermission DIR_PERMISSION = 
FsPermission.createImmutable((short)01777);
 
+  // Since the directories have broad permissions restrict the file read 
access.
+  private static final FsPermission FILE_UMASK = 
FsPermission.createImmutable((short)0066);
+
   private final Parser<T> parser;
   private final Path basePath;
   private final Configuration conf;
@@ -57,11 +60,12 @@ public class DatePartitionedLogger<T extends MessageLite> {
 
   public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration 
conf, Clock clock)
       throws IOException {
-    this.conf = conf;
+    this.conf = new Configuration(conf);
     this.clock = clock;
     this.parser = parser;
     createDirIfNotExists(baseDir);
     this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir);
+    FsPermission.setUMask(this.conf, FILE_UMASK);
   }
 
   private void createDirIfNotExists(Path path) throws IOException {
@@ -101,6 +105,10 @@ public class DatePartitionedLogger<T extends MessageLite> {
     return new Path(path, fileName);
   }
 
+  public Path getPathForSubdir(String dirName, String fileName) {
+    return new Path(new Path(basePath, dirName), fileName);
+  }
+
   /**
    * Extract the date from the directory name, this should be a directory 
created by this class.
    */
@@ -144,11 +152,11 @@ public class DatePartitionedLogger<T extends MessageLite> 
{
    * Returns new or changed files in the given directory. The offsets are used 
to find
    * changed files.
    */
-  public List<Path> scanForChangedFiles(String subDir, Map<String, Long> 
currentOffsets)
+  public List<FileStatus> scanForChangedFiles(String subDir, Map<String, Long> 
currentOffsets)
       throws IOException {
     Path dirPath = new Path(basePath, subDir);
     FileSystem fileSystem = basePath.getFileSystem(conf);
-    List<Path> newFiles = new ArrayList<>();
+    List<FileStatus> newFiles = new ArrayList<>();
     if (!fileSystem.exists(dirPath)) {
       return newFiles;
     }
@@ -157,7 +165,7 @@ public class DatePartitionedLogger<T extends MessageLite> {
       Long offset = currentOffsets.get(fileName);
       // If the offset was never added or offset < fileSize.
       if (offset == null || offset < status.getLen()) {
-        newFiles.add(new Path(dirPath, fileName));
+        newFiles.add(status);
       }
     }
     return newFiles;

http://git-wip-us.apache.org/repos/asf/hive/blob/29315fcb/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java 
b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
index 5a3c63a..b56f066 100644
--- 
a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
+++ 
b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
@@ -24,19 +24,22 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
 
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
 public class ProtoMessageReader<T extends MessageLite> implements Closeable {
   private final Path filePath;
-  private final SequenceFile.Reader reader;
+  private final Reader reader;
   private final ProtoMessageWritable<T> writable;
 
   ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) 
throws IOException {
     this.filePath = filePath;
-    this.reader = new SequenceFile.Reader(conf, 
SequenceFile.Reader.file(filePath));
+    // The writer does not flush the length during hflush. Using length 
options lets us read
+    // past length in the FileStatus but it will throw EOFException during a 
read instead
+    // of returning null.
+    this.reader = new Reader(conf, Reader.file(filePath), 
Reader.length(Long.MAX_VALUE));
     this.writable = new ProtoMessageWritable<>(parser);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/29315fcb/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java 
b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
index c746bb6..9c086ef 100644
--- 
a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
+++ 
b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
@@ -26,24 +26,24 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
 
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
 public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
   private final Path filePath;
-  private final SequenceFile.Writer writer;
+  private final Writer writer;
   private final ProtoMessageWritable<T> writable;
 
   ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) 
throws IOException {
     this.filePath = filePath;
     this.writer = SequenceFile.createWriter(
         conf,
-        SequenceFile.Writer.file(filePath),
-        SequenceFile.Writer.keyClass(NullWritable.class),
-        SequenceFile.Writer.valueClass(ProtoMessageWritable.class),
-        SequenceFile.Writer.appendIfExists(true),
-        SequenceFile.Writer.compression(CompressionType.RECORD));
+        Writer.file(filePath),
+        Writer.keyClass(NullWritable.class),
+        Writer.valueClass(ProtoMessageWritable.class),
+        Writer.compression(CompressionType.RECORD));
     this.writable = new ProtoMessageWritable<>(parser);
   }
 

Reply via email to