HIVE-18054: Make Lineage work with concurrent queries on a Session (Andrew Sherman, reviewed by Sahil Takiar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/646ccce8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/646ccce8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/646ccce8 Branch: refs/heads/standalone-metastore Commit: 646ccce8ea3e8c944be164f86dbd5d3428bdbc44 Parents: f52e8b4 Author: Andrew Sherman <asher...@cloudera.com> Authored: Sat Dec 16 15:14:54 2017 -0600 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Sat Dec 16 15:24:11 2017 -0600 ---------------------------------------------------------------------- .../java/org/apache/hive/jdbc/ReadableHook.java | 52 +++++++++ .../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 114 +++++++++++++++++++ .../java/org/apache/hadoop/hive/ql/Driver.java | 46 ++++++-- .../org/apache/hadoop/hive/ql/QueryState.java | 35 +++++- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 6 +- .../apache/hadoop/hive/ql/exec/MoveTask.java | 9 +- .../org/apache/hadoop/hive/ql/exec/Task.java | 4 + .../bootstrap/load/table/LoadPartitions.java | 3 +- .../repl/bootstrap/load/table/LoadTable.java | 3 +- .../hadoop/hive/ql/hooks/HookContext.java | 9 +- .../hadoop/hive/ql/hooks/LineageLogger.java | 56 +++++++-- .../hive/ql/index/AggregateIndexHandler.java | 7 +- .../hadoop/hive/ql/index/HiveIndexHandler.java | 6 +- .../hive/ql/index/TableBasedIndexHandler.java | 18 ++- .../ql/index/bitmap/BitmapIndexHandler.java | 8 +- .../ql/index/compact/CompactIndexHandler.java | 8 +- .../hive/ql/optimizer/GenMRFileSink1.java | 2 +- .../hive/ql/optimizer/GenMapRedUtils.java | 27 +++-- .../hadoop/hive/ql/optimizer/IndexUtils.java | 6 +- .../hive/ql/optimizer/lineage/Generator.java | 8 +- .../hive/ql/parse/DDLSemanticAnalyzer.java | 15 ++- .../hive/ql/parse/ExplainSemanticAnalyzer.java | 2 +- .../hadoop/hive/ql/parse/GenTezUtils.java | 3 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 5 +- .../hadoop/hive/ql/parse/IndexUpdater.java | 9 +- .../hive/ql/parse/LoadSemanticAnalyzer.java | 2 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 4 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 8 +- .../hadoop/hive/ql/parse/TaskCompiler.java | 11 +- .../hive/ql/parse/spark/GenSparkUtils.java | 2 +- .../apache/hadoop/hive/ql/plan/MoveWork.java | 25 +--- .../hadoop/hive/ql/session/LineageState.java | 2 +- .../hadoop/hive/ql/session/SessionState.java | 15 --- ...TestGenMapRedUtilsCreateConditionalTask.java | 18 ++- 34 files changed, 403 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java new file mode 100644 index 0000000..2dd283f --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.jdbc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; +import org.apache.hadoop.hive.ql.hooks.HookContext; + +/** + * An ExecuteWithHookContext that stores HookContexts in memory and makes them available for reading + */ +public class ReadableHook implements ExecuteWithHookContext { + + private static List<HookContext> hookList = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void run(HookContext hookContext) throws Exception { + hookList.add(hookContext); + } + + /** + * @return the stored HookContexts. + */ + public static List<HookContext> getHookList() { + return hookList; + } + + /** + * Clear the stored HookContexts. + */ + public static void clear() { + hookList.clear(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 70bd29c..ffeee69 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -40,6 +40,7 @@ import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -47,6 +48,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; @@ -64,8 +66,12 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.LineageLogger; +import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx; import org.apache.hive.common.util.ReflectionUtil; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.service.cli.HiveSQLException; @@ -205,6 +211,9 @@ public class TestJdbcWithMiniHS2 { conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false); conf.setBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER, false); + // store post-exec hooks calls so we can look at them later + conf.setVar(ConfVars.POSTEXECHOOKS, ReadableHook.class.getName() + "," + + LineageLogger.class.getName()); MiniHS2.Builder builder = new MiniHS2.Builder().withConf(conf).cleanupLocalDirOnStartup(false); if (httpMode) { builder = builder.withHTTPTransport(); @@ -1503,4 +1512,109 @@ public class TestJdbcWithMiniHS2 { stmt.close(); fsConn.close(); } + + /** + * A test that checks that Lineage is correct when a multiple concurrent + * requests are make on a connection + */ + @Test + public void testConcurrentLineage() throws Exception { + // setup to run concurrent operations + Statement stmt = conTestDb.createStatement(); + setSerializeInTasksInConf(stmt); + stmt.execute("drop table if exists testConcurrentLineage1"); + stmt.execute("drop table if exists testConcurrentLineage2"); + stmt.execute("create table testConcurrentLineage1 (col1 int)"); + stmt.execute("create table testConcurrentLineage2 (col2 int)"); + + // clear vertices list + ReadableHook.clear(); + + // run 5 sql inserts concurrently + int numThreads = 5; // set to 1 for single threading + int concurrentCalls = 5; + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + try { + List<InsertCallable> tasks = new ArrayList<>(); + for (int i = 0; i < concurrentCalls; i++) { + InsertCallable runner = new InsertCallable(conTestDb); + tasks.add(runner); + } + List<Future<Void>> futures = pool.invokeAll(tasks); + for (Future<Void> future : futures) { + future.get(20, TimeUnit.SECONDS); + } + // check to see that the vertices are correct + checkVertices(); + } finally { + // clean up + stmt.execute("drop table testConcurrentLineage1"); + stmt.execute("drop table testConcurrentLineage2"); + stmt.close(); + pool.shutdownNow(); + } + } + + /** + * A Callable that does 2 inserts + */ + private class InsertCallable implements Callable<Void> { + private Connection connection; + + InsertCallable(Connection conn) { + this.connection = conn; + } + + @Override public Void call() throws Exception { + doLineageInserts(connection); + return null; + } + + private void doLineageInserts(Connection connection) throws SQLException { + Statement stmt = connection.createStatement(); + stmt.execute("insert into testConcurrentLineage1 values (1)"); + stmt.execute("insert into testConcurrentLineage2 values (2)"); + } + } + /** + * check to see that the vertices derived from the HookContexts are correct + */ + private void checkVertices() { + List<Set<LineageLogger.Vertex>> verticesLists = getVerticesFromHooks(); + + assertEquals("5 runs of 2 inserts makes 10", 10, verticesLists.size()); + for (Set<LineageLogger.Vertex> vertices : verticesLists) { + assertFalse("Each insert affects a column so should be some vertices", + vertices.isEmpty()); + assertEquals("Each insert affects one column so should be one vertex", + 1, vertices.size()); + Iterator<LineageLogger.Vertex> iterator = vertices.iterator(); + assertTrue(iterator.hasNext()); + LineageLogger.Vertex vertex = iterator.next(); + assertEquals(0, vertex.getId()); + assertEquals(LineageLogger.Vertex.Type.COLUMN, vertex.getType()); + String label = vertex.getLabel(); + System.out.println("vertex.getLabel() = " + label); + assertTrue("did not see one of the 2 expected column names", + label.equals("testjdbcminihs2.testconcurrentlineage1.col1") || + label.equals("testjdbcminihs2.testconcurrentlineage2.col2")); + } + } + + /** + * Use the logic in LineageLogger to get vertices from Hook Contexts + */ + private List<Set<LineageLogger.Vertex>> getVerticesFromHooks() { + List<Set<LineageLogger.Vertex>> verticesLists = new ArrayList<>(); + List<HookContext> hookList = ReadableHook.getHookList(); + for (HookContext hookContext : hookList) { + QueryPlan plan = hookContext.getQueryPlan(); + LineageCtx.Index index = hookContext.getIndex(); + assertNotNull(index); + List<LineageLogger.Edge> edges = LineageLogger.getEdges(plan, index); + Set<LineageLogger.Vertex> vertices = LineageLogger.getVertices(edges); + verticesLists.add(vertices); + } + return verticesLists; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d3df015..b168906 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.WmContext; @@ -374,12 +375,20 @@ public class Driver implements CommandProcessor { this(getNewQueryState(conf), null); } + // Pass lineageState when a driver instantiates another Driver to run + // or compile another query + public Driver(HiveConf conf, LineageState lineageState) { + this(getNewQueryState(conf, lineageState), null); + } + public Driver(HiveConf conf, HiveTxnManager txnMgr) { this(getNewQueryState(conf), null, null, txnMgr); } - public Driver(HiveConf conf, Context ctx) { - this(getNewQueryState(conf), null, null); + // Pass lineageState when a driver instantiates another Driver to run + // or compile another query + public Driver(HiveConf conf, Context ctx, LineageState lineageState) { + this(getNewQueryState(conf, lineageState), null, null); this.ctx = ctx; } @@ -387,6 +396,12 @@ public class Driver implements CommandProcessor { this(getNewQueryState(conf), userName, null); } + // Pass lineageState when a driver instantiates another Driver to run + // or compile another query + public Driver(HiveConf conf, String userName, LineageState lineageState) { + this(getNewQueryState(conf, lineageState), userName, null); + } + public Driver(QueryState queryState, String userName) { this(queryState, userName, new HooksLoader(queryState.getConf()), null, null); } @@ -425,6 +440,20 @@ public class Driver implements CommandProcessor { } /** + * Generating the new QueryState object. Making sure, that the new queryId is generated. + * @param conf The HiveConf which should be used + * @param lineageState a LineageState to be set in the new QueryState object + * @return The new QueryState object + */ + private static QueryState getNewQueryState(HiveConf conf, LineageState lineageState) { + return new QueryState.Builder() + .withGenerateNewQueryId(true) + .withHiveConf(conf) + .withLineageState(lineageState) + .build(); + } + + /** * Compile a new query. Any currently-planned query associated with this Driver is discarded. * Do not reset id for inner queries(index, etc). Task ids are used for task identity check. * @@ -1336,9 +1365,6 @@ public class Driver implements CommandProcessor { private void releaseResources() { releasePlan(); releaseDriverContext(); - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } } @Override @@ -2404,9 +2430,6 @@ public class Driver implements CommandProcessor { releaseFetchTask(); releaseResStream(); releaseContext(); - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } if(destroyed) { if (!hiveLocks.isEmpty()) { try { @@ -2440,9 +2463,6 @@ public class Driver implements CommandProcessor { lDrvState.stateLock.unlock(); LockedDriverState.removeLockedDriverState(); } - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } return 0; } @@ -2504,4 +2524,8 @@ public class Driver implements CommandProcessor { releaseResources(); this.queryState = getNewQueryState(queryState.getConf()); } + + public QueryState getQueryState() { + return queryState; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index f3a46db..4f0c165 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.session.LineageState; /** * The class to store query level info such as queryId. Multiple queries can run @@ -40,12 +41,17 @@ public class QueryState { private HiveOperation commandType; /** + * Per-query Lineage state to track what happens in the query + */ + private LineageState lineageState = new LineageState(); + + /** * transaction manager used in the query. */ private HiveTxnManager txnManager; /** - * Private constructor, use QueryState.Builder instead + * Private constructor, use QueryState.Builder instead. * @param conf The query specific configuration object */ private QueryState(HiveConf conf) { @@ -79,6 +85,14 @@ public class QueryState { return queryConf; } + public LineageState getLineageState() { + return lineageState; + } + + public void setLineageState(LineageState lineageState) { + this.lineageState = lineageState; + } + public HiveTxnManager getTxnManager() { return txnManager; } @@ -95,9 +109,10 @@ public class QueryState { private boolean runAsync = false; private boolean generateNewQueryId = false; private HiveConf hiveConf = null; + private LineageState lineageState = null; /** - * Default constructor - use this builder to create a QueryState object + * Default constructor - use this builder to create a QueryState object. */ public Builder() { } @@ -149,6 +164,16 @@ public class QueryState { } /** + * add a LineageState that will be set in the built QueryState + * @param lineageState the source lineageState + * @return the builder + */ + public Builder withLineageState(LineageState lineageState) { + this.lineageState = lineageState; + return this; + } + + /** * Creates the QueryState object. The default values are: * - runAsync false * - confOverlay null @@ -184,7 +209,11 @@ public class QueryState { queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, QueryPlan.makeQueryId()); } - return new QueryState(queryConf); + QueryState queryState = new QueryState(queryConf); + if (lineageState != null) { + queryState.setLineageState(lineageState); + } + return queryState; } } } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 55ef8de..05041cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4478,7 +4478,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } } // Don't set inputs and outputs - the locks have already been taken so it's pointless. - MoveWork mw = new MoveWork(null, null, null, null, false, SessionState.get().getLineageState()); + MoveWork mw = new MoveWork(null, null, null, null, false); mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null)); ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId); Task<?> mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf); @@ -4909,7 +4909,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName()); if (crtTbl.isCTAS()) { DataContainer dc = new DataContainer(createdTable.getTTable()); - SessionState.get().getLineageState().setLineage( + queryState.getLineageState().setLineage( createdTable.getPath(), dc, createdTable.getCols() ); } @@ -5137,7 +5137,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { //set lineage info DataContainer dc = new DataContainer(tbl.getTTable()); - SessionState.get().getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols()); + queryState.getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols()); } return 0; } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index f5a5e71..8387208 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -398,7 +398,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { dc = handleStaticParts(db, table, tbd, ti); } } - if (work.getLineagState() != null && dc != null) { + if (dc != null) { // If we are doing an update or a delete the number of columns in the table will not // match the number of columns in the file sink. For update there will be one too many // (because of the ROW__ID), and in the case of the delete there will be just the @@ -416,7 +416,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { tableCols = table.getCols(); break; } - work.getLineagState().setLineage(tbd.getSourcePath(), dc, tableCols); + queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols); } releaseLocks(tbd); } @@ -552,10 +552,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable { dc = new DataContainer(table.getTTable(), partn.getTPartition()); // Don't set lineage on delete as we don't have all the columns - if (work.getLineagState() != null && - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && + if (work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { - work.getLineagState().setLineage(tbd.getSourcePath(), dc, + queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } LOG.info("Loading partition " + entry.getKey()); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 1f0487f..d75fcf7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -649,4 +649,8 @@ public abstract class Task<T extends Serializable> implements Serializable, Node return true; } + public QueryState getQueryState() { + return queryState; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 262225f..1a542e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -245,8 +245,7 @@ public class LoadPartitions { SessionState.get().getTxnMgr().getCurrentTxnId() ); loadTableWork.setInheritTableSpecs(false); - MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, - context.sessionStateLineageState); + MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); return TaskFactory.get(work, context.hiveConf); } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 545b7a8..f5125a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -233,8 +233,7 @@ public class LoadTable { SessionState.get().getTxnMgr().getCurrentTxnId() ); MoveWork moveWork = - new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, - context.sessionStateLineageState); + new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf); copyTask.addDependentTask(loadTableTask); return copyTask; http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java index 7b61730..93f1da7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -88,12 +87,8 @@ public class HookContext { inputs = queryPlan.getInputs(); outputs = queryPlan.getOutputs(); ugi = Utils.getUGI(); - linfo= null; - depMap = null; - if(SessionState.get() != null){ - linfo = SessionState.get().getLineageState().getLineageInfo(); - depMap = SessionState.get().getLineageState().getIndex(); - } + linfo = queryState.getLineageState().getLineageInfo(); + depMap = queryState.getLineageState().getIndex(); this.userName = userName; this.ipAddress = ipAddress; this.hiveInstanceAddress = hiveInstanceAddress; http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java index 2f764f8..06eb9c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.hooks; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; @@ -74,7 +75,15 @@ public class LineageLogger implements ExecuteWithHookContext { private static final String FORMAT_VERSION = "1.0"; - final static class Edge { + /** + * An edge in lineage. + */ + @VisibleForTesting + public static final class Edge { + + /** + * The types of Edge. + */ public static enum Type { PROJECTION, PREDICATE } @@ -92,7 +101,15 @@ public class LineageLogger implements ExecuteWithHookContext { } } - final static class Vertex { + /** + * A vertex in lineage. + */ + @VisibleForTesting + public static final class Vertex { + + /** + * A type in lineage. + */ public static enum Type { COLUMN, TABLE } @@ -125,6 +142,21 @@ public class LineageLogger implements ExecuteWithHookContext { Vertex vertex = (Vertex) obj; return label.equals(vertex.label) && type == vertex.type; } + + @VisibleForTesting + public Type getType() { + return type; + } + + @VisibleForTesting + public String getLabel() { + return label; + } + + @VisibleForTesting + public int getId() { + return id; + } } @Override @@ -203,7 +235,7 @@ public class LineageLogger implements ExecuteWithHookContext { /** * Logger an error to console if available. */ - private void log(String error) { + private static void log(String error) { LogHelper console = SessionState.getConsole(); if (console != null) { console.printError(error); @@ -214,7 +246,8 @@ public class LineageLogger implements ExecuteWithHookContext { * Based on the final select operator, find out all the target columns. * For each target column, find out its sources based on the dependency index. */ - private List<Edge> getEdges(QueryPlan plan, Index index) { + @VisibleForTesting + public static List<Edge> getEdges(QueryPlan plan, Index index) { LinkedHashMap<String, ObjectPair<SelectOperator, org.apache.hadoop.hive.ql.metadata.Table>> finalSelOps = index.getFinalSelectOps(); Map<String, Vertex> vertexCache = new LinkedHashMap<String, Vertex>(); @@ -292,7 +325,7 @@ public class LineageLogger implements ExecuteWithHookContext { return edges; } - private void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges, + private static void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges, Set<BaseColumnInfo> srcCols, Vertex target, String expr, Edge.Type type) { Set<Vertex> targets = new LinkedHashSet<Vertex>(); targets.add(target); @@ -304,7 +337,7 @@ public class LineageLogger implements ExecuteWithHookContext { * If found, add the more targets to this edge's target vertex list. * Otherwise, create a new edge and add to edge list. */ - private void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges, + private static void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges, Set<BaseColumnInfo> srcCols, Set<Vertex> targets, String expr, Edge.Type type) { Set<Vertex> sources = createSourceVertices(vertexCache, srcCols); Edge edge = findSimilarEdgeBySources(edges, sources, expr, type); @@ -319,7 +352,7 @@ public class LineageLogger implements ExecuteWithHookContext { * Convert a list of columns to a set of vertices. * Use cached vertices if possible. */ - private Set<Vertex> createSourceVertices( + private static Set<Vertex> createSourceVertices( Map<String, Vertex> vertexCache, Collection<BaseColumnInfo> baseCols) { Set<Vertex> sources = new LinkedHashSet<Vertex>(); if (baseCols != null && !baseCols.isEmpty()) { @@ -346,7 +379,7 @@ public class LineageLogger implements ExecuteWithHookContext { /** * Find a vertex from a cache, or create one if not. */ - private Vertex getOrCreateVertex( + private static Vertex getOrCreateVertex( Map<String, Vertex> vertices, String label, Vertex.Type type) { Vertex vertex = vertices.get(label); if (vertex == null) { @@ -359,7 +392,7 @@ public class LineageLogger implements ExecuteWithHookContext { /** * Find an edge that has the same type, expression, and sources. */ - private Edge findSimilarEdgeBySources( + private static Edge findSimilarEdgeBySources( List<Edge> edges, Set<Vertex> sources, String expr, Edge.Type type) { for (Edge edge: edges) { if (edge.type == type && StringUtils.equals(edge.expr, expr) @@ -373,7 +406,7 @@ public class LineageLogger implements ExecuteWithHookContext { /** * Generate normalized name for a given target column. */ - private String getTargetFieldName(int fieldIndex, + private static String getTargetFieldName(int fieldIndex, String destTableName, List<String> colNames, List<FieldSchema> fieldSchemas) { String fieldName = fieldSchemas.get(fieldIndex).getName(); String[] parts = fieldName.split("\\."); @@ -394,7 +427,8 @@ public class LineageLogger implements ExecuteWithHookContext { * Get all the vertices of all edges. Targets at first, * then sources. Assign id to each vertex. */ - private Set<Vertex> getVertices(List<Edge> edges) { + @VisibleForTesting + public static Set<Vertex> getVertices(List<Edge> edges) { Set<Vertex> vertices = new LinkedHashSet<Vertex>(); for (Edge edge: edges) { vertices.addAll(edge.targets); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java index 68709b4..bf06723 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.plan.PartitionDesc; - +import org.apache.hadoop.hive.ql.session.LineageState; /** * Index handler for indexes that have aggregate functions on indexed columns. @@ -90,7 +90,8 @@ public class AggregateIndexHandler extends CompactIndexHandler { Set<WriteEntity> outputs, Index index, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) { List<FieldSchema> indexField = index.getSd().getCols(); String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField); @@ -152,7 +153,7 @@ public class AggregateIndexHandler extends CompactIndexHandler { builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, (LinkedHashMap<String, String>) partSpec, indexTableName, dbName); + command, (LinkedHashMap<String, String>) partSpec, indexTableName, dbName, lineageState); return rootTask; } } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java index 1e577da..b6c0252 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.session.LineageState; /** * HiveIndexHandler defines a pluggable interface for adding new index handlers @@ -99,6 +100,9 @@ public interface HiveIndexHandler extends Configurable { * outputs for hooks, supplemental outputs going * along with the return value * + * @param lineageState + * tracks Lineage for the query + * * @return list of tasks to be executed in parallel for building the index * * @throws HiveException if plan generation fails @@ -108,7 +112,7 @@ public interface HiveIndexHandler extends Configurable { org.apache.hadoop.hive.metastore.api.Index index, List<Partition> indexTblPartitions, List<Partition> baseTblPartitions, org.apache.hadoop.hive.ql.metadata.Table indexTbl, - Set<ReadEntity> inputs, Set<WriteEntity> outputs) + Set<ReadEntity> inputs, Set<WriteEntity> outputs, LineageState lineageState) throws HiveException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java index 29886ae..744ac29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; /** * Index handler for indexes that use tables to store indexes. @@ -51,7 +52,8 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler { org.apache.hadoop.hive.metastore.api.Index index, List<Partition> indexTblPartitions, List<Partition> baseTblPartitions, org.apache.hadoop.hive.ql.metadata.Table indexTbl, - Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws HiveException { + Set<ReadEntity> inputs, Set<WriteEntity> outputs, + LineageState lineageState) throws HiveException { try { TableDesc desc = Utilities.getTableDesc(indexTbl); @@ -66,7 +68,7 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler { Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, false, new PartitionDesc(desc, null), indexTbl.getTableName(), new PartitionDesc(Utilities.getTableDesc(baseTbl), null), - baseTbl.getTableName(), indexTbl.getDbName()); + baseTbl.getTableName(), indexTbl.getDbName(), lineageState); indexBuilderTasks.add(indexBuilder); } else { @@ -89,7 +91,8 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler { // for each partition, spawn a map reduce task. Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, true, new PartitionDesc(indexPart), indexTbl.getTableName(), - new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName()); + new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName(), + lineageState); indexBuilderTasks.add(indexBuilder); } } @@ -102,15 +105,18 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler { protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs, Index index, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { return getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), - partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName); + partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName, + lineageState); } protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs, List<FieldSchema> indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java index 7b067a0..9117159 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; @@ -115,7 +116,7 @@ public class BitmapIndexHandler extends TableBasedIndexHandler { LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString()); HiveConf queryConf = new HiveConf(pctx.getConf(), BitmapIndexHandler.class); HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false); - Driver driver = new Driver(queryConf); + Driver driver = new Driver(queryConf, pctx.getQueryState().getLineageState()); driver.compile(qlCommand.toString(), false); queryContext.setIndexIntermediateFile(tmpFile); @@ -222,7 +223,8 @@ public class BitmapIndexHandler extends TableBasedIndexHandler { protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs, List<FieldSchema> indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { HiveConf builderConf = new HiveConf(getConf(), BitmapIndexHandler.class); HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEROWOFFSET, true); @@ -290,7 +292,7 @@ public class BitmapIndexHandler extends TableBasedIndexHandler { } Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, partSpec, indexTableName, dbName); + command, partSpec, indexTableName, dbName, lineageState); return rootTask; } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index 504b062..73278cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; @@ -94,7 +95,8 @@ public class CompactIndexHandler extends TableBasedIndexHandler { protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs, List<FieldSchema> indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField); @@ -150,7 +152,7 @@ public class CompactIndexHandler extends TableBasedIndexHandler { builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, partSpec, indexTableName, dbName); + command, partSpec, indexTableName, dbName, lineageState); return rootTask; } @@ -189,7 +191,7 @@ public class CompactIndexHandler extends TableBasedIndexHandler { LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString()); HiveConf queryConf = new HiveConf(pctx.getConf(), CompactIndexHandler.class); HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false); - Driver driver = new Driver(queryConf); + Driver driver = new Driver(queryConf, pctx.getQueryState().getLineageState()); driver.compile(qlCommand.toString(), false); if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) { http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index d7a83f7..bb42dde 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -112,7 +112,7 @@ public class GenMRFileSink1 implements NodeProcessor { LOG.info("using CombineHiveInputformat for the merge job"); GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName, ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(), - hconf, currTask); + hconf, currTask, parseCtx.getQueryState().getLineageState()); } FileSinkDesc fileSinkDesc = fsOp.getConf(); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index bdaf105..a0b2678 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1229,6 +1229,7 @@ public final class GenMapRedUtils { * @param mvTasks * @param conf * @param currTask + * @param lineageState * @throws SemanticException * create a Map-only merge job using CombineHiveInputFormat for all partitions with @@ -1257,10 +1258,11 @@ public final class GenMapRedUtils { * directories. * */ - public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, - Path finalName, DependencyCollectionTask dependencyTask, - List<Task<MoveWork>> mvTasks, HiveConf conf, - Task<? extends Serializable> currTask) throws SemanticException { + public static void createMRWorkForMergingFiles(FileSinkOperator fsInput, + Path finalName, DependencyCollectionTask dependencyTask, + List<Task<MoveWork>> mvTasks, HiveConf conf, + Task<? extends Serializable> currTask, LineageState lineageState) + throws SemanticException { // // 1. create the operator tree @@ -1370,8 +1372,7 @@ public final class GenMapRedUtils { if (srcMmWriteId == null) { // Only create the movework for non-MM table. No action needed for a MM table. dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(inputDirName, finalName, true, null, null, false), false, - SessionState.get().getLineageState()); + new LoadFileDesc(inputDirName, finalName, true, null, null, false), false); } // Use the original fsOp path here in case of MM - while the new FSOP merges files inside the // MM directory, the original MoveTask still commits based on the parent. Note that this path @@ -1382,7 +1383,7 @@ public final class GenMapRedUtils { Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput( mvTasks, fsopPath, fsInputDesc.isMmTable()); ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work, - fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask); + fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask, lineageState); // keep the dynamic partition context in conditional task resolver context ConditionalResolverMergeFilesCtx mrCtx = @@ -1730,15 +1731,16 @@ public final class GenMapRedUtils { * * @param condInputPath A path that the ConditionalTask uses as input for its sub-tasks. * @param linkedMoveWork A MoveWork that the ConditionalTask uses to link to its sub-tasks. + * @param lineageState A LineageState used to track what changes. * @return A new MoveWork that has the Conditional input path as source and the linkedMoveWork as target. */ @VisibleForTesting - protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork) { + protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork, + LineageState lineageState) { MoveWork newWork = new MoveWork(linkedMoveWork); LoadFileDesc fileDesc = null; LoadTableDesc tableDesc = null; - LineageState lineageState = SessionState.get().getLineageState(); if (linkedMoveWork.getLoadFileWork() != null) { fileDesc = new LoadFileDesc(linkedMoveWork.getLoadFileWork()); fileDesc.setSourcePath(condInputPath); @@ -1776,13 +1778,15 @@ public final class GenMapRedUtils { * a MoveTask that may be linked to the conditional sub-tasks * @param dependencyTask * a dependency task that may be linked to the conditional sub-tasks + * @param lineageState + * to track activity * @return The conditional task */ @SuppressWarnings("unchecked") private static ConditionalTask createCondTask(HiveConf conf, Task<? extends Serializable> currTask, MoveWork mvWork, Serializable mergeWork, Path condInputPath, Path condOutputPath, Task<MoveWork> moveTaskToLink, - DependencyCollectionTask dependencyTask) { + DependencyCollectionTask dependencyTask, LineageState lineageState) { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Creating conditional merge task for " + condInputPath); } @@ -1795,7 +1799,8 @@ public final class GenMapRedUtils { Serializable workForMoveOnlyTask = moveWork; if (shouldMergeMovePaths) { - workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork()); + workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork(), + lineageState); } // There are 3 options for this ConditionalTask: http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java index 338c185..f69c9a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.LineageState; /** * Utility class for index support. @@ -221,10 +222,11 @@ public final class IndexUtils { StringBuilder command, LinkedHashMap<String, String> partSpec, String indexTableName, - String dbName){ + String dbName, + LineageState lineageState){ // Don't try to index optimize the query to build the index HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER, false); - Driver driver = new Driver(builderConf, SessionState.get().getUserName()); + Driver driver = new Driver(builderConf, SessionState.get().getUserName(), lineageState); driver.compile(command.toString(), false); Task<?> rootTask = driver.getPlan().getRootTasks().get(0); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java index e6c0771..0d72a1e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.optimizer.Transform; import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,9 +84,10 @@ public class Generator extends Transform { return pctx; } } - - Index index = SessionState.get() != null ? - SessionState.get().getLineageState().getIndex() : new Index(); + Index index = pctx.getQueryState().getLineageState().getIndex(); + if (index == null) { + index = new Index(); + } long sTime = System.currentTimeMillis(); // Create the lineage context http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index a09b796..971a061 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -158,6 +158,7 @@ import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.CreateOrAlterWMPoolDesc; import org.apache.hadoop.hive.ql.plan.CreateOrDropTriggerToPoolMappingDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde.serdeConstants; @@ -1485,8 +1486,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { partSpec == null ? new HashMap<>() : partSpec, null); ltd.setLbCtx(lbCtx); @SuppressWarnings("unchecked") - Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork( - null, null, ltd, null, false, SessionState.get().getLineageState()), conf); + Task<MoveWork> moveTsk = + TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); truncateTask.addDependentTask(moveTsk); // Recalculate the HDFS stats if auto gather stats is set @@ -1703,8 +1704,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { indexTbl, db, indexTblPartitions); } + LineageState lineageState = queryState.getLineageState(); List<Task<?>> ret = handler.generateIndexBuildTaskList(baseTbl, - index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs()); + index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs(), + lineageState); return ret; } catch (Exception e) { throw new SemanticException(e); @@ -2146,8 +2149,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap<>() : partSpec, null); ltd.setLbCtx(lbCtx); - Task<MoveWork> moveTsk = TaskFactory.get( - new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf); + Task<MoveWork> moveTsk = + TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); mergeTask.addDependentTask(moveTsk); if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { @@ -3539,7 +3542,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { } SessionState ss = SessionState.get(); String uName = (ss == null? null: ss.getUserName()); - Driver driver = new Driver(conf, uName); + Driver driver = new Driver(conf, uName, queryState.getLineageState()); int rc = driver.compile(cmd.toString(), false); if (rc != 0) { throw new SemanticException(ErrorMsg.NO_VALID_PARTN.getMsg()); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java index 065c7e5..0eacfc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java @@ -134,7 +134,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer { runCtx = new Context(conf); // runCtx and ctx share the configuration, but not isExplainPlan() runCtx.setExplainConfig(config); - Driver driver = new Driver(conf, runCtx); + Driver driver = new Driver(conf, runCtx, queryState.getLineageState()); CommandProcessorResponse ret = driver.run(query); if(ret.getResponseCode() == 0) { // Note that we need to call getResults for simple fetch optimization. http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index b6f1139..e6d4cbe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -385,7 +385,8 @@ public class GenTezUtils { + fileSink.getConf().getDirName() + " to " + finalName); GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, context.dependencyTask, context.moveTask, - hconf, context.currentTask); + hconf, context.currentTask, + parseContext.getQueryState().getLineageState()); } FetchTask fetchTask = parseContext.getFetchTask(); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 83d53bc..c79df56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -391,7 +391,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Utilities.getTableDesc(table), new TreeMap<>(), replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId); loadTableWork.setStmtId(stmtId); - MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()); + MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, + null, false); Task<?> loadTableTask = TaskFactory.get(mv, x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); @@ -495,7 +496,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); Task<?> loadPartTask = TaskFactory.get(new MoveWork( - x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), x.getConf()); + x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); x.getTasks().add(copyTask); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java index f31775e..ccf1e66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import java.io.Serializable; import java.util.LinkedList; @@ -47,12 +48,14 @@ public class IndexUpdater { private Hive hive; private List<Task<? extends Serializable>> tasks; private Set<ReadEntity> inputs; + private LineageState lineageState; - - public IndexUpdater(List<LoadTableDesc> loadTableWork, Set<ReadEntity> inputs, Configuration conf) { + public IndexUpdater(List<LoadTableDesc> loadTableWork, Set<ReadEntity> inputs, Configuration conf, + LineageState lineageState) { this.loadTableWork = loadTableWork; this.inputs = inputs; this.conf = new HiveConf(conf, IndexUpdater.class); + this.lineageState = lineageState; this.tasks = new LinkedList<Task<? extends Serializable>>(); } @@ -133,7 +136,7 @@ public class IndexUpdater { } private void compileRebuild(String query) { - Driver driver = new Driver(this.conf); + Driver driver = new Driver(this.conf, lineageState); driver.compile(query, false); tasks.addAll(driver.getPlan().getRootTasks()); inputs.addAll(driver.getPlan().getInputs()); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index cc956da..e600f7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -302,7 +302,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { Task<? extends Serializable> childTask = TaskFactory.get( new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true, - isLocal, SessionState.get().getLineageState()), conf + isLocal), conf ); if (rTask != null) { rTask.addDependentTask(childTask); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 498b674..80556ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -313,7 +313,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern, - SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); + queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf, true)); return; } @@ -344,7 +344,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); + queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf, true)); // // for (FileStatus dir : dirsInLoadPath) { http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 28e3621..dcda8b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7336,8 +7336,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private void handleLineage(LoadTableDesc ltd, Operator output) throws SemanticException { - if (ltd != null && SessionState.get() != null) { - SessionState.get().getLineageState() + if (ltd != null) { + queryState.getLineageState() .mapDirToOp(ltd.getSourcePath(), output); } else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) { @@ -7350,7 +7350,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { throw new SemanticException(e); } - SessionState.get().getLineageState() + queryState.getLineageState() .mapDirToOp(tlocation, output); } } @@ -11685,7 +11685,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { pCtx = t.transform(pCtx); } // we just use view name as location. - SessionState.get().getLineageState() + queryState.getLineageState() .mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp); } return; http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 7b29370..24559b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde.serdeConstants; @@ -105,7 +106,8 @@ public abstract class TaskCompiler { } @SuppressWarnings({"nls", "unchecked"}) - public void compile(final ParseContext pCtx, final List<Task<? extends Serializable>> rootTasks, + public void compile(final ParseContext pCtx, + final List<Task<? extends Serializable>> rootTasks, final HashSet<ReadEntity> inputs, final HashSet<WriteEntity> outputs) throws SemanticException { Context ctx = pCtx.getContext(); @@ -218,12 +220,13 @@ public abstract class TaskCompiler { } else if (!isCStats) { for (LoadTableDesc ltd : loadTableWork) { Task<MoveWork> tsk = TaskFactory - .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), + .get(new MoveWork(null, null, ltd, null, false), conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { - IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf); + IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf, + queryState.getLineageState()); try { List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater .generateUpdateTasks(); @@ -248,7 +251,7 @@ public abstract class TaskCompiler { oneLoadFileForCtas = false; } mvTask.add(TaskFactory - .get(new MoveWork(null, null, null, lfd, false, SessionState.get().getLineageState()), + .get(new MoveWork(null, null, null, lfd, false), conf)); } } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 604c8ae..c6c7bf7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -385,7 +385,7 @@ public class GenSparkUtils { LOG.info("using CombineHiveInputformat for the merge job"); GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, context.dependencyTask, context.moveTask, - hconf, context.currentTask); + hconf, context.currentTask, parseContext.getQueryState().getLineageState()); } FetchTask fetchTask = parseContext.getFetchTask(); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 28a3374..49fe540 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.plan.Explain.Level; -import org.apache.hadoop.hive.ql.session.LineageState; /** * MoveWork. @@ -39,13 +38,6 @@ public class MoveWork implements Serializable { private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; private LoadMultiFilesDesc loadMultiFilesWork; - /* - these are sessionState objects that are copied over to work to allow for parallel execution. - based on the current use case the methods are selectively synchronized, which might need to be - taken care when using other methods. - */ - private final LineageState sessionStateLineageState; - private boolean checkFileFormat; private boolean srcLocal; @@ -65,21 +57,18 @@ public class MoveWork implements Serializable { private boolean isNoop; public MoveWork() { - sessionStateLineageState = null; } - private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, - LineageState lineageState) { + private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) { this.inputs = inputs; this.outputs = outputs; - sessionStateLineageState = lineageState; } public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, boolean srcLocal, LineageState lineageState) { - this(inputs, outputs, lineageState); + boolean checkFileFormat, boolean srcLocal) { + this(inputs, outputs); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Creating MoveWork " + System.identityHashCode(this) + " with " + loadTableWork + "; " + loadFileWork); @@ -92,8 +81,8 @@ public class MoveWork implements Serializable { public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, LineageState lineageState) { - this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false, lineageState); + boolean checkFileFormat) { + this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false); } public MoveWork(final MoveWork o) { @@ -104,7 +93,6 @@ public class MoveWork implements Serializable { srcLocal = o.isSrcLocal(); inputs = o.getInputs(); outputs = o.getOutputs(); - sessionStateLineageState = o.sessionStateLineageState; } @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -166,7 +154,4 @@ public class MoveWork implements Serializable { this.srcLocal = srcLocal; } - public LineageState getLineagState() { - return sessionStateLineageState; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java index 056d614..82eeb35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java @@ -60,7 +60,7 @@ public class LineageState implements Serializable { /** * Constructor. */ - LineageState() { + public LineageState() { dirToFop = new HashMap<>(); linfo = new LineageInfo(); index = new Index(); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index bb6ddc6..d03f5e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -232,11 +232,6 @@ public class SessionState { */ private Map<URI, HadoopShims.HdfsEncryptionShim> hdfsEncryptionShims = Maps.newHashMap(); - /** - * Lineage state. - */ - LineageState ls; - private final String userName; /** @@ -294,15 +289,6 @@ public class SessionState { private List<Closeable> cleanupItems = new LinkedList<Closeable>(); - /** - * Get the lineage state stored in this session. - * - * @return LineageState - */ - public LineageState getLineageState() { - return ls; - } - public HiveConf getConf() { return sessionConf; } @@ -387,7 +373,6 @@ public class SessionState { LOG.debug("SessionState user: " + userName); } isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT); - ls = new LineageState(); resourceMaps = new ResourceMaps(); // Must be deterministic order map for consistent q-test output across Java versions overriddenConfigurations = new LinkedHashMap<String, String>(); http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java index 3406892..3c007a7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.Before; import org.junit.BeforeClass; @@ -135,9 +136,10 @@ public class TestGenMapRedUtilsCreateConditionalTask { public void testMergePathWithInvalidMoveWorkThrowsException() { final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000"); final MoveWork mockWork = mock(MoveWork.class); + final LineageState lineageState = new LineageState(); when(mockWork.getLoadMultiFilesWork()).thenReturn(new LoadMultiFilesDesc()); - GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); } @Test @@ -146,12 +148,13 @@ public class TestGenMapRedUtilsCreateConditionalTask { final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002"); final Path targetMoveWorkPath = new Path("s3a://bucket/scratch/-ext-10003"); final MoveWork mockWork = mock(MoveWork.class); + final LineageState lineageState = new LineageState(); MoveWork newWork; // test using loadFileWork when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc( condOutputPath, targetMoveWorkPath, false, "", "", false)); - newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); assertNotNull(newWork); assertNotEquals(newWork, mockWork); assertEquals(condInputPath, newWork.getLoadFileWork().getSourcePath()); @@ -162,7 +165,7 @@ public class TestGenMapRedUtilsCreateConditionalTask { reset(mockWork); when(mockWork.getLoadTableWork()).thenReturn(new LoadTableDesc( condOutputPath, tableDesc, null, null)); - newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); assertNotNull(newWork); assertNotEquals(newWork, mockWork); assertEquals(condInputPath, newWork.getLoadTableWork().getSourcePath()); @@ -181,7 +184,8 @@ public class TestGenMapRedUtilsCreateConditionalTask { Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation); List<Task<MoveWork>> moveTaskList = Collections.singletonList(moveTask); - GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); + GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, + moveTaskList, hiveConf, dummyMRTask, new LineageState()); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0); Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1); @@ -221,7 +225,8 @@ public class TestGenMapRedUtilsCreateConditionalTask { Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation); List<Task<MoveWork>> moveTaskList = Collections.singletonList(moveTask); - GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); + GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, + moveTaskList, hiveConf, dummyMRTask, new LineageState()); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0); Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1); @@ -255,7 +260,8 @@ public class TestGenMapRedUtilsCreateConditionalTask { Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation); List<Task<MoveWork>> moveTaskList = Collections.singletonList(moveTask); - GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); + GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, + moveTaskList, hiveConf, dummyMRTask, new LineageState()); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0); Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1);