hive git commit: HIVE-18453: ACID: Add "CREATE TRANSACTIONAL TABLE" syntax to unify ACID ORC & Parquet support (Igor Kryvenko via Eugene Koifman
Repository: hive Updated Branches: refs/heads/master 8ebde0441 -> 9c907769a HIVE-18453: ACID: Add "CREATE TRANSACTIONAL TABLE" syntax to unify ACID ORC & Parquet support (Igor Kryvenko via Eugene Koifman Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9c907769 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9c907769 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9c907769 Branch: refs/heads/master Commit: 9c907769a63a6b23c91fdf0b3f3d0aa6387035dc Parents: 8ebde04 Author: Igor Kryvenko Authored: Tue Sep 18 19:16:38 2018 -0700 Committer: Eugene Koifman Committed: Tue Sep 18 19:16:38 2018 -0700 -- .../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 1 + .../apache/hadoop/hive/ql/parse/HiveParser.g| 5 +- .../hadoop/hive/ql/parse/IdentifiersParser.g| 2 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 46 - .../create_external_transactional.q | 5 + .../create_transactional_full_acid.q| 28 +++ .../create_transactional_insert_only.q | 13 ++ .../create_external_transactional.q.out | 1 + .../create_transactional_full_acid.q.out| 197 +++ .../create_transactional_insert_only.q.out | 75 +++ 10 files changed, 362 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/9c907769/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g -- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index 3caa51f..8bf9cc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -304,6 +304,7 @@ KW_PRINCIPALS: 'PRINCIPALS'; KW_COMPACT: 'COMPACT'; KW_COMPACTIONS: 'COMPACTIONS'; KW_TRANSACTIONS: 'TRANSACTIONS'; +KW_TRANSACTIONAL: 'TRANSACTIONAL'; KW_REWRITE : 'REWRITE'; KW_AUTHORIZATION: 'AUTHORIZATION'; KW_REOPTIMIZATION: 'REOPTIMIZATION'; http://git-wip-us.apache.org/repos/asf/hive/blob/9c907769/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g -- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 48f7303..78bc87c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -618,6 +618,7 @@ import org.apache.hadoop.hive.conf.HiveConf; xlateMap.put("KW_SCHEDULING_POLICY", "SCHEDULING_POLICY"); xlateMap.put("KW_PATH", "PATH"); xlateMap.put("KW_AST", "AST"); +xlateMap.put("KW_TRANSACTIONAL", "TRANSACTIONAL"); // Operators xlateMap.put("DOT", "."); @@ -1091,7 +1092,7 @@ databaseComment createTableStatement @init { pushMsg("create table statement", state); } @after { popMsg(state); } -: KW_CREATE (temp=KW_TEMPORARY)? (ext=KW_EXTERNAL)? KW_TABLE ifNotExists? name=tableName +: KW_CREATE (temp=KW_TEMPORARY)? (trans=KW_TRANSACTIONAL)? (ext=KW_EXTERNAL)? KW_TABLE ifNotExists? name=tableName ( like=KW_LIKE likeName=tableName tableRowFormat? tableFileFormat? @@ -1108,7 +1109,7 @@ createTableStatement tablePropertiesPrefixed? (KW_AS selectStatementWithCTE)? ) --> ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists? +-> ^(TOK_CREATETABLE $name $temp? $trans? $ext? ifNotExists? ^(TOK_LIKETABLE $likeName?) columnNameTypeOrConstraintList? tableComment? http://git-wip-us.apache.org/repos/asf/hive/blob/9c907769/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g -- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index f9c97e0..fa033d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -809,7 +809,7 @@ nonReserved | KW_ROLE | KW_ROLES | KW_SCHEMA | KW_SCHEMAS | KW_SECOND | KW_SEMI | KW_SERDE | KW_SERDEPROPERTIES | KW_SERVER | KW_SETS | KW_SHARED | KW_SHOW | KW_SHOW_DATABASE | KW_SKEWED | KW_SORT | KW_SORTED | KW_SSL | KW_STATISTICS | KW_STORED | KW_AST | KW_STREAMTABLE | KW_STRING | KW_STRUCT | KW_TABLES | KW_TBLPROPERTIES | KW_TEMPORARY | KW_TERMINATED -| KW_TINYINT | KW_TOUCH | KW_TRANSACTIONS | KW_UNARCHIVE | KW_UNDO | KW_UNIONTYPE | KW_UNLOCK | KW_UNSET +| KW_TINYINT | KW_TOUCH | KW_TRANSACTIONAL | KW_TRANSACTIONS | KW_UNARCHIVE | KW_UNDO | KW_UNIONTYPE | KW_UNLOCK | KW_UNSET | KW_UNSIGNED | KW_URI
[1/2] hive git commit: HIVE-20011: Move away from append mode in proto logging hook (Harish JP, reviewd by Anishek Agarwal)
Repository: hive Updated Branches: refs/heads/branch-3 204a0e211 -> 36c33ca06 HIVE-20011: Move away from append mode in proto logging hook (Harish JP, reviewd by Anishek Agarwal) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/29315fcb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/29315fcb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/29315fcb Branch: refs/heads/branch-3 Commit: 29315fcbb53bf10af16f75ec3d36965c061eedd6 Parents: 204a0e2 Author: Anishek Agarwal Authored: Fri Jun 29 15:05:17 2018 +0530 Committer: Prasanth Jayachandran Committed: Tue Sep 18 13:19:49 2018 -0700 -- .../hive/ql/hooks/HiveProtoLoggingHook.java | 24 +--- .../logging/proto/DatePartitionedLogger.java| 18 +++ .../logging/proto/ProtoMessageReader.java | 9 +--- .../logging/proto/ProtoMessageWriter.java | 12 +- 4 files changed, 46 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/29315fcb/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java -- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 1e7070b..49cba4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -86,6 +86,7 @@ import static org.apache.hadoop.hive.ql.plan.HiveOperation.UNLOCKTABLE; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -101,6 +102,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -180,6 +182,9 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { private final DatePartitionedLogger logger; private final ExecutorService eventHandler; private final ExecutorService logWriter; +private int logFileCount = 0; +private ProtoMessageWriter writer; +private LocalDate writerDate; EventLogger(HiveConf conf, Clock clock) { this.clock = clock; @@ -234,6 +239,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { LOG.warn("Got interrupted exception while waiting for events to be flushed", e); } } + IOUtils.closeQuietly(writer); } void handle(HookContext hookContext) { @@ -285,12 +291,24 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { private static final int MAX_RETRIES = 2; private void writeEvent(HiveHookEventProto event) { for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) { -try (ProtoMessageWriter writer = logger.getWriter(logFileName)) { +try { + if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) { +if (writer != null) { + // Day change over case, reset the logFileCount. + logFileCount = 0; + IOUtils.closeQuietly(writer); +} +// increment log file count, if creating a new writer. +writer = logger.getWriter(logFileName + "_" + ++logFileCount); +writerDate = logger.getDateFromDir(writer.getPath().getParent().getName()); + } writer.writeProto(event); - // This does not work hence, opening and closing file for every event. - // writer.hflush(); + writer.hflush(); return; } catch (IOException e) { + // Something wrong with writer, lets close and reopen. + IOUtils.closeQuietly(writer); + writer = null; if (retryCount < MAX_RETRIES) { LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," + " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount, http://git-wip-us.apache.org/repos/asf/hive/blob/29315fcb/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java -- diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java index d6a5121..58cec7e 100644 ---
[2/2] hive git commit: HIVE-20582: Make hflush in hive proto logging configurable (Prasanth Jayachandran reviewed by Thejas M Nair)
HIVE-20582: Make hflush in hive proto logging configurable (Prasanth Jayachandran reviewed by Thejas M Nair) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/36c33ca0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/36c33ca0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/36c33ca0 Branch: refs/heads/branch-3 Commit: 36c33ca066c99dfdb21223a711c0c3f33c85b943 Parents: 29315fc Author: Prasanth Jayachandran Authored: Tue Sep 18 13:10:07 2018 -0700 Committer: Prasanth Jayachandran Committed: Tue Sep 18 13:20:02 2018 -0700 -- .../java/org/apache/hadoop/hive/conf/HiveConf.java| 3 +++ .../hadoop/hive/ql/hooks/HiveProtoLoggingHook.java| 14 +- 2 files changed, 16 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/36c33ca0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java -- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9523640..4ec6368 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -632,6 +632,9 @@ public class HiveConf extends Configuration { HIVE_PROTO_EVENTS_TTL("hive.hook.proto.events.ttl", "7d", new TimeValidator(TimeUnit.DAYS), "Time-To-Live (TTL) of proto event files before cleanup."), +HIVE_PROTO_FILE_PER_EVENT("hive.hook.proto.file.per.event", false, + "Whether each proto event has to be written to separate file. " + +"(Use this for FS that does not hflush immediately like S3A)"), // Hadoop Configuration Properties // Properties with null values are ignored and exist only for the purpose of giving us http://git-wip-us.apache.org/repos/asf/hive/blob/36c33ca0/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java -- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 49cba4c..aa3a926 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -185,6 +185,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { private int logFileCount = 0; private ProtoMessageWriter writer; private LocalDate writerDate; +private boolean eventPerFile; EventLogger(HiveConf conf, Clock clock) { this.clock = clock; @@ -196,6 +197,8 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { LOG.error(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH.varname + " is not set, logging disabled."); } + eventPerFile = conf.getBoolVar(ConfVars.HIVE_PROTO_FILE_PER_EVENT); + LOG.info("Event per file enabled: {}", eventPerFile); DatePartitionedLogger tmpLogger = null; try { if (baseDir != null) { @@ -303,7 +306,16 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { writerDate = logger.getDateFromDir(writer.getPath().getParent().getName()); } writer.writeProto(event); - writer.hflush(); + if (eventPerFile) { +if (writer != null) { + LOG.debug("Event per file enabled. Closing proto event file: {}", writer.getPath()); + IOUtils.closeQuietly(writer); +} +// rollover to next file +writer = logger.getWriter(logFileName + "_" + ++logFileCount); + } else { +writer.hflush(); + } return; } catch (IOException e) { // Something wrong with writer, lets close and reopen.
hive git commit: HIVE-20582: Make hflush in hive proto logging configurable (Prasanth Jayachandran reviewed by Thejas M Nair)
Repository: hive Updated Branches: refs/heads/master 7450ce762 -> 8ebde0441 HIVE-20582: Make hflush in hive proto logging configurable (Prasanth Jayachandran reviewed by Thejas M Nair) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8ebde044 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8ebde044 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8ebde044 Branch: refs/heads/master Commit: 8ebde04411a4ca34994019db75b57ebee9c28f71 Parents: 7450ce7 Author: Prasanth Jayachandran Authored: Tue Sep 18 13:10:07 2018 -0700 Committer: Prasanth Jayachandran Committed: Tue Sep 18 13:10:14 2018 -0700 -- .../java/org/apache/hadoop/hive/conf/HiveConf.java| 3 +++ .../hadoop/hive/ql/hooks/HiveProtoLoggingHook.java| 14 +- 2 files changed, 16 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/8ebde044/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java -- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4fb8a30..8a561e5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -642,6 +642,9 @@ public class HiveConf extends Configuration { HIVE_PROTO_EVENTS_TTL("hive.hook.proto.events.ttl", "7d", new TimeValidator(TimeUnit.DAYS), "Time-To-Live (TTL) of proto event files before cleanup."), +HIVE_PROTO_FILE_PER_EVENT("hive.hook.proto.file.per.event", false, + "Whether each proto event has to be written to separate file. " + +"(Use this for FS that does not hflush immediately like S3A)"), // Hadoop Configuration Properties // Properties with null values are ignored and exist only for the purpose of giving us http://git-wip-us.apache.org/repos/asf/hive/blob/8ebde044/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java -- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 673c858..0af30d4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -186,6 +186,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { private int logFileCount = 0; private ProtoMessageWriter writer; private LocalDate writerDate; +private boolean eventPerFile; EventLogger(HiveConf conf, Clock clock) { this.clock = clock; @@ -197,6 +198,8 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { LOG.error(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH.varname + " is not set, logging disabled."); } + eventPerFile = conf.getBoolVar(ConfVars.HIVE_PROTO_FILE_PER_EVENT); + LOG.info("Event per file enabled: {}", eventPerFile); DatePartitionedLogger tmpLogger = null; try { if (baseDir != null) { @@ -289,7 +292,16 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { writerDate = logger.getDateFromDir(writer.getPath().getParent().getName()); } writer.writeProto(event); - writer.hflush(); + if (eventPerFile) { +if (writer != null) { + LOG.debug("Event per file enabled. Closing proto event file: {}", writer.getPath()); + IOUtils.closeQuietly(writer); +} +// rollover to next file +writer = logger.getWriter(logFileName + "_" + ++logFileCount); + } else { +writer.hflush(); + } return; } catch (IOException e) { // Something wrong with writer, lets close and reopen.
hive git commit: HIVE-20583: Use canonical hostname only for kerberos auth in HiveConnection (Prasanth Jayachandran reviewed by Gopal V)
Repository: hive Updated Branches: refs/heads/master beaa8a8c3 -> 7450ce762 HIVE-20583: Use canonical hostname only for kerberos auth in HiveConnection (Prasanth Jayachandran reviewed by Gopal V) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7450ce76 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7450ce76 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7450ce76 Branch: refs/heads/master Commit: 7450ce7624da4fc92d98691db5346aed15277a6b Parents: beaa8a8 Author: Prasanth Jayachandran Authored: Tue Sep 18 09:58:06 2018 -0700 Committer: Prasanth Jayachandran Committed: Tue Sep 18 11:48:09 2018 -0700 -- .../src/java/org/apache/hive/jdbc/HiveConnection.java | 14 +++--- 1 file changed, 11 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/7450ce76/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java -- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 70cc34d..9f457a2 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -171,9 +171,13 @@ public class HiveConnection implements java.sql.Connection { // sess_var_list -> sessConfMap // hive_conf_list -> hiveConfMap // hive_var_list -> hiveVarMap -host = Utils.getCanonicalHostName(connParams.getHost()); -port = connParams.getPort(); sessConfMap = connParams.getSessionVars(); +if (isKerberosAuthMode()) { + host = Utils.getCanonicalHostName(connParams.getHost()); +} else { + host = connParams.getHost(); +} +port = connParams.getPort(); isEmbeddedMode = connParams.isEmbeddedMode(); if (sessConfMap.containsKey(JdbcConnectionParams.FETCH_SIZE)) { @@ -241,7 +245,11 @@ public class HiveConnection implements java.sql.Connection { } // Update with new values jdbcUriString = connParams.getJdbcUriString(); -host = Utils.getCanonicalHostName(connParams.getHost()); +if (isKerberosAuthMode()) { + host = Utils.getCanonicalHostName(connParams.getHost()); +} else { + host = connParams.getHost(); +} port = connParams.getPort(); } else { errMsg = warnMsg;
hive git commit: HIVE-20583: Use canonical hostname only for kerberos auth in HiveConnection (Prasanth Jayachandran reviewed by Gopal V)
Repository: hive Updated Branches: refs/heads/branch-3 f8f0ca50a -> 204a0e211 HIVE-20583: Use canonical hostname only for kerberos auth in HiveConnection (Prasanth Jayachandran reviewed by Gopal V) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/204a0e21 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/204a0e21 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/204a0e21 Branch: refs/heads/branch-3 Commit: 204a0e211d6a60e7151b4b937f774bb86e327df9 Parents: f8f0ca5 Author: Prasanth Jayachandran Authored: Tue Sep 18 09:58:06 2018 -0700 Committer: Prasanth Jayachandran Committed: Tue Sep 18 11:48:30 2018 -0700 -- .../src/java/org/apache/hive/jdbc/HiveConnection.java | 14 +++--- 1 file changed, 11 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/204a0e21/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java -- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 458158e..14939cb 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -171,9 +171,13 @@ public class HiveConnection implements java.sql.Connection { // sess_var_list -> sessConfMap // hive_conf_list -> hiveConfMap // hive_var_list -> hiveVarMap -host = Utils.getCanonicalHostName(connParams.getHost()); -port = connParams.getPort(); sessConfMap = connParams.getSessionVars(); +if (isKerberosAuthMode()) { + host = Utils.getCanonicalHostName(connParams.getHost()); +} else { + host = connParams.getHost(); +} +port = connParams.getPort(); isEmbeddedMode = connParams.isEmbeddedMode(); if (sessConfMap.containsKey(JdbcConnectionParams.FETCH_SIZE)) { @@ -241,7 +245,11 @@ public class HiveConnection implements java.sql.Connection { } // Update with new values jdbcUriString = connParams.getJdbcUriString(); -host = Utils.getCanonicalHostName(connParams.getHost()); +if (isKerberosAuthMode()) { + host = Utils.getCanonicalHostName(connParams.getHost()); +} else { + host = connParams.getHost(); +} port = connParams.getPort(); } else { errMsg = warnMsg;
hive git commit: HIVE-20561: Use the position of the Kafka Consumer to track progress instead of Consumer Records offsets (Slim Bouguerra, reviewed by Vineet Garg)
Repository: hive Updated Branches: refs/heads/master 366eaceff -> beaa8a8c3 HIVE-20561: Use the position of the Kafka Consumer to track progress instead of Consumer Records offsets (Slim Bouguerra, reviewed by Vineet Garg) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/beaa8a8c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/beaa8a8c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/beaa8a8c Branch: refs/heads/master Commit: beaa8a8c3e041f026d1cf5b85d120a5709db73b0 Parents: 366eace Author: Slim Bouguerra Authored: Tue Sep 18 10:54:52 2018 -0700 Committer: Vineet Garg Committed: Tue Sep 18 10:56:09 2018 -0700 -- .../hive/kafka/KafkaPullerRecordReader.java | 15 +- .../hadoop/hive/kafka/KafkaRecordIterator.java | 195 +-- .../hive/kafka/KafkaRecordIteratorTest.java | 20 +- 3 files changed, 121 insertions(+), 109 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/beaa8a8c/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java -- diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java index 4f0ee94..06a10b4 100644 --- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java +++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java @@ -89,7 +89,7 @@ import java.util.Properties; LOG.debug("Consumer poll timeout [{}] ms", pollTimeout); this.recordsCursor = startOffset == endOffset ? - new KafkaRecordIterator.EmptyIterator() : + new EmptyIterator() : new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout); started = true; } @@ -157,4 +157,17 @@ import java.util.Properties; consumer.close(); } } + + /** + * Empty iterator for empty splits when startOffset == endOffset, this is added to avoid clumsy if condition. + */ + private static final class EmptyIterator implements Iterator> { +@Override public boolean hasNext() { + return false; +} + +@Override public ConsumerRecord next() { + throw new IllegalStateException("this is an empty iterator"); +} + } } http://git-wip-us.apache.org/repos/asf/hive/blob/beaa8a8c/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java -- diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java index 7daa3e2..c252455 100644 --- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java +++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java @@ -18,14 +18,13 @@ package org.apache.hadoop.hive.kafka; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,108 +35,120 @@ import java.util.List; import java.util.concurrent.TimeUnit; /** - * Iterator over Kafka Records to read records from a single topic partition inclusive start exclusive end. - * - * If {@code startOffset} is not null will seek up to that offset - * Else If {@code startOffset} is null will seek to beginning see - * {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)} - * - * When provided with an end offset it will return records up to the record with offset == endOffset - 1, - * Else If end offsets is null it will read up to the current end see - * {@link org.apache.kafka.clients.consumer.Consumer#endOffsets(java.util.Collection)} - * - * Current implementation of this Iterator will throw and exception if can not poll up to the endOffset - 1 + * Iterator over Kafka Records to read records from a single topic partition inclusive start, exclusive end. */ public class KafkaRecordIterator implements Iterator> { private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordIterator.class); + private static final String + POLL_TIMEOUT_HINT = + String.format("Try increasing poll timeout using Hive Table property [%s]", +