Repository: hive Updated Branches: refs/heads/master e54acc611 -> 078b9c333
HIVE-19100 : investigate TestStreaming failures(Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/078b9c33 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/078b9c33 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/078b9c33 Branch: refs/heads/master Commit: 078b9c333076b864115f36f3698ceff6af5b2344 Parents: e54acc6 Author: Eugene Koifman <ekoif...@apache.org> Authored: Wed Apr 4 12:50:13 2018 -0700 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Wed Apr 4 12:50:37 2018 -0700 ---------------------------------------------------------------------- .../hive/hcatalog/streaming/TestStreaming.java | 10 +++---- .../hive/ql/parse/DDLSemanticAnalyzer.java | 29 ++++++++++++-------- 2 files changed, 22 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/078b9c33/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index a9ab90b..3733e3d 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -2101,7 +2101,7 @@ public class TestStreaming { ///////// -------- UTILS ------- ///////// // returns Path of the partition created (if any) else Path of table - public static Path createDbAndTable(IDriver driver, String databaseName, + private static Path createDbAndTable(IDriver driver, String databaseName, String tableName, List<String> partVals, String[] colNames, String[] colTypes, String[] bucketCols, @@ -2147,7 +2147,7 @@ public class TestStreaming { private static String getTableColumnsStr(String[] colNames, String[] colTypes) { StringBuilder sb = new StringBuilder(); for (int i=0; i < colNames.length; ++i) { - sb.append(colNames[i] + " " + colTypes[i]); + sb.append(colNames[i]).append(" ").append(colTypes[i]); if (i<colNames.length-1) { sb.append(","); } @@ -2162,7 +2162,7 @@ public class TestStreaming { } StringBuilder sb = new StringBuilder(); for (int i=0; i < partNames.length; ++i) { - sb.append(partNames[i] + " string"); + sb.append(partNames[i]).append(" string"); if (i < partNames.length-1) { sb.append(","); } @@ -2174,7 +2174,7 @@ public class TestStreaming { private static String getPartsSpec(String[] partNames, List<String> partVals) { StringBuilder sb = new StringBuilder(); for (int i=0; i < partVals.size(); ++i) { - sb.append(partNames[i] + " = '" + partVals.get(i) + "'"); + sb.append(partNames[i]).append(" = '").append(partVals.get(i)).append("'"); if(i < partVals.size()-1) { sb.append(","); } @@ -2217,7 +2217,7 @@ public class TestStreaming { } - public static ArrayList<String> queryTable(IDriver driver, String query) throws IOException { + private static ArrayList<String> queryTable(IDriver driver, String query) throws IOException { CommandProcessorResponse cpr = driver.run(query); if(cpr.getResponseCode() != 0) { throw new RuntimeException(query + " failed: " + cpr); http://git-wip-us.apache.org/repos/asf/hive/blob/078b9c33/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index cfe8af8..9e66422 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -3526,29 +3526,33 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { if(!AcidUtils.isTransactionalTable(tab)) { return; } - Long writeId; - int stmtId; - try { - writeId = SessionState.get().getTxnMgr().getTableWriteId(tab.getDbName(), - tab.getTableName()); - } catch (LockException ex) { - throw new SemanticException("Failed to allocate the write id", ex); - } - stmtId = SessionState.get().getTxnMgr().getStmtIdAndIncrement(); + Long writeId = null; + int stmtId = 0; for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) { OnePartitionDesc desc = addPartitionDesc.getPartition(index); if (desc.getLocation() != null) { if(addPartitionDesc.isIfNotExists()) { - //Don't add + //Don't add partition data if it already exists Partition oldPart = getPartition(tab, desc.getPartSpec(), false); if(oldPart != null) { continue; } } + if(writeId == null) { + //so that we only allocate a writeId only if actually adding data + // (vs. adding a partition w/o data) + try { + writeId = SessionState.get().getTxnMgr().getTableWriteId(tab.getDbName(), + tab.getTableName()); + } catch (LockException ex) { + throw new SemanticException("Failed to allocate the write id", ex); + } + stmtId = SessionState.get().getTxnMgr().getStmtIdAndIncrement(); + } LoadTableDesc loadTableWork = new LoadTableDesc(new Path(desc.getLocation()), Utilities.getTableDesc(tab), desc.getPartSpec(), - LoadTableDesc.LoadFileType.KEEP_EXISTING,//not relevant - creating new partition + LoadTableDesc.LoadFileType.KEEP_EXISTING, //not relevant - creating new partition writeId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(true); @@ -3557,7 +3561,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { Warehouse.makePartPath(desc.getPartSpec())).toString()); } catch (MetaException ex) { - throw new SemanticException("Could not determine partition path due to: " + ex.getMessage(), ex); + throw new SemanticException("Could not determine partition path due to: " + + ex.getMessage(), ex); } Task<MoveWork> moveTask = TaskFactory.get( new MoveWork(getInputs(), getOutputs(), loadTableWork, null,