Repository: hive Updated Branches: refs/heads/branch-3 204a0e211 -> 36c33ca06
HIVE-20011: Move away from append mode in proto logging hook (Harish JP, reviewd by Anishek Agarwal) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/29315fcb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/29315fcb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/29315fcb Branch: refs/heads/branch-3 Commit: 29315fcbb53bf10af16f75ec3d36965c061eedd6 Parents: 204a0e2 Author: Anishek Agarwal <anis...@gmail.com> Authored: Fri Jun 29 15:05:17 2018 +0530 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Tue Sep 18 13:19:49 2018 -0700 ---------------------------------------------------------------------- .../hive/ql/hooks/HiveProtoLoggingHook.java | 24 +++++++++++++++++--- .../logging/proto/DatePartitionedLogger.java | 18 +++++++++++---- .../logging/proto/ProtoMessageReader.java | 9 +++++--- .../logging/proto/ProtoMessageWriter.java | 12 +++++----- 4 files changed, 46 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/29315fcb/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 1e7070b..49cba4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -86,6 +86,7 @@ import static org.apache.hadoop.hive.ql.plan.HiveOperation.UNLOCKTABLE; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -101,6 +102,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -180,6 +182,9 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { private final DatePartitionedLogger<HiveHookEventProto> logger; private final ExecutorService eventHandler; private final ExecutorService logWriter; + private int logFileCount = 0; + private ProtoMessageWriter<HiveHookEventProto> writer; + private LocalDate writerDate; EventLogger(HiveConf conf, Clock clock) { this.clock = clock; @@ -234,6 +239,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { LOG.warn("Got interrupted exception while waiting for events to be flushed", e); } } + IOUtils.closeQuietly(writer); } void handle(HookContext hookContext) { @@ -285,12 +291,24 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { private static final int MAX_RETRIES = 2; private void writeEvent(HiveHookEventProto event) { for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) { - try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) { + try { + if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) { + if (writer != null) { + // Day change over case, reset the logFileCount. + logFileCount = 0; + IOUtils.closeQuietly(writer); + } + // increment log file count, if creating a new writer. + writer = logger.getWriter(logFileName + "_" + ++logFileCount); + writerDate = logger.getDateFromDir(writer.getPath().getParent().getName()); + } writer.writeProto(event); - // This does not work hence, opening and closing file for every event. - // writer.hflush(); + writer.hflush(); return; } catch (IOException e) { + // Something wrong with writer, lets close and reopen. + IOUtils.closeQuietly(writer); + writer = null; if (retryCount < MAX_RETRIES) { LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," + " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount, http://git-wip-us.apache.org/repos/asf/hive/blob/29315fcb/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java index d6a5121..58cec7e 100644 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java @@ -45,11 +45,14 @@ import com.google.protobuf.Parser; * @param <T> The proto message type. */ public class DatePartitionedLogger<T extends MessageLite> { - private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class); // Everyone has permission to write, but with sticky set so that delete is restricted. // This is required, since the path is same for all users and everyone writes into it. private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777); + // Since the directories have broad permissions restrict the file read access. + private static final FsPermission FILE_UMASK = FsPermission.createImmutable((short)0066); + private final Parser<T> parser; private final Path basePath; private final Configuration conf; @@ -57,11 +60,12 @@ public class DatePartitionedLogger<T extends MessageLite> { public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock) throws IOException { - this.conf = conf; + this.conf = new Configuration(conf); this.clock = clock; this.parser = parser; createDirIfNotExists(baseDir); this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir); + FsPermission.setUMask(this.conf, FILE_UMASK); } private void createDirIfNotExists(Path path) throws IOException { @@ -101,6 +105,10 @@ public class DatePartitionedLogger<T extends MessageLite> { return new Path(path, fileName); } + public Path getPathForSubdir(String dirName, String fileName) { + return new Path(new Path(basePath, dirName), fileName); + } + /** * Extract the date from the directory name, this should be a directory created by this class. */ @@ -144,11 +152,11 @@ public class DatePartitionedLogger<T extends MessageLite> { * Returns new or changed files in the given directory. The offsets are used to find * changed files. */ - public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets) + public List<FileStatus> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets) throws IOException { Path dirPath = new Path(basePath, subDir); FileSystem fileSystem = basePath.getFileSystem(conf); - List<Path> newFiles = new ArrayList<>(); + List<FileStatus> newFiles = new ArrayList<>(); if (!fileSystem.exists(dirPath)) { return newFiles; } @@ -157,7 +165,7 @@ public class DatePartitionedLogger<T extends MessageLite> { Long offset = currentOffsets.get(fileName); // If the offset was never added or offset < fileSize. if (offset == null || offset < status.getLen()) { - newFiles.add(new Path(dirPath, fileName)); + newFiles.add(status); } } return newFiles; http://git-wip-us.apache.org/repos/asf/hive/blob/29315fcb/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java index 5a3c63a..b56f066 100644 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java @@ -24,19 +24,22 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; public class ProtoMessageReader<T extends MessageLite> implements Closeable { private final Path filePath; - private final SequenceFile.Reader reader; + private final Reader reader; private final ProtoMessageWritable<T> writable; ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException { this.filePath = filePath; - this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath)); + // The writer does not flush the length during hflush. Using length options lets us read + // past length in the FileStatus but it will throw EOFException during a read instead + // of returning null. + this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE)); this.writable = new ProtoMessageWritable<>(parser); } http://git-wip-us.apache.org/repos/asf/hive/blob/29315fcb/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java index c746bb6..9c086ef 100644 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java @@ -26,24 +26,24 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Writer; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; public class ProtoMessageWriter<T extends MessageLite> implements Closeable { private final Path filePath; - private final SequenceFile.Writer writer; + private final Writer writer; private final ProtoMessageWritable<T> writable; ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException { this.filePath = filePath; this.writer = SequenceFile.createWriter( conf, - SequenceFile.Writer.file(filePath), - SequenceFile.Writer.keyClass(NullWritable.class), - SequenceFile.Writer.valueClass(ProtoMessageWritable.class), - SequenceFile.Writer.appendIfExists(true), - SequenceFile.Writer.compression(CompressionType.RECORD)); + Writer.file(filePath), + Writer.keyClass(NullWritable.class), + Writer.valueClass(ProtoMessageWritable.class), + Writer.compression(CompressionType.RECORD)); this.writable = new ProtoMessageWritable<>(parser); }