shaofengshi closed pull request #219: KYLIN-3515 Add uuid for materialized table of hive view URL: https://github.com/apache/kylin/pull/219
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 23dcaf37eb..a774c0d333 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -1023,8 +1023,8 @@ public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadab return dictAssist.getDictionary(cubeSeg, col); } - public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { - return dictAssist.buildSnapshotTable(cubeSeg, lookupTable); + public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException { + return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid); } private TableMetadataManager getMetadataManager() { @@ -1098,7 +1098,7 @@ private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryIn return (Dictionary<String>) info.getDictionaryObject(); } - public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { + public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException { // work on copy instead of cached objects CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid()); @@ -1107,7 +1107,7 @@ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) SnapshotManager snapshotMgr = getSnapshotManager(); TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject())); - IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig()); CubeDesc cubeDesc = cubeSeg.getCubeDesc(); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 7fcf3208d1..6de42ac3bd 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -42,26 +42,28 @@ private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class); - public static void processSegment(KylinConfig config, String cubeName, String segmentID, DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException { + public static void processSegment(KylinConfig config, String cubeName, String segmentID, String uuid, + DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException { CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); CubeSegment segment = cube.getSegmentById(segmentID); - processSegment(config, segment, factTableValueProvider, dictProvider); + processSegment(config, segment, uuid, factTableValueProvider, dictProvider); } - private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException { + private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String uuid, + DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException { CubeManager cubeMgr = CubeManager.getInstance(config); // dictionary for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) { logger.info("Building dictionary for " + col); IReadableTable inpTable = factTableValueProvider.getDistinctValuesFor(col); - + Dictionary<String> preBuiltDict = null; if (dictProvider != null) { preBuiltDict = dictProvider.getDictionary(col); } - + if (preBuiltDict != null) { logger.debug("Dict for '" + col.getName() + "' has already been built, save it"); cubeMgr.saveDictionary(cubeSeg, col, inpTable, preBuiltDict); @@ -87,9 +89,9 @@ private static void processSegment(KylinConfig config, CubeSegment cubeSeg, Dist for (String tableIdentity : toSnapshot) { logger.info("Building snapshot of " + tableIdentity); - cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity); + cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity, uuid); } - + CubeInstance updatedCube = cubeMgr.getCube(cubeSeg.getCubeInstance().getName()); cubeSeg = updatedCube.getSegmentById(cubeSeg.getUuid()); for (TableRef lookup : toCheckLookup) { diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java index f965d18908..e30d15643d 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java @@ -42,7 +42,10 @@ private static void rebuild(String table, String overwriteUUID, String project) if (tableDesc == null) throw new IllegalArgumentException("Not table found by " + table); - SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceManager.createReadableTable(tableDesc), tableDesc, overwriteUUID); + if (tableDesc.isView()) + throw new IllegalArgumentException("Build snapshot of hive view \'" + table + "\' not supported."); + + SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceManager.createReadableTable(tableDesc, null), tableDesc, overwriteUUID); System.out.println("resource path updated: " + snapshot.getResourcePath()); } } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index dbbfc3962c..ad22abc322 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -62,6 +62,7 @@ private KylinConfig config; private String name; private String id; + private AbstractExecutable parentExecutable = null; private Map<String, String> params = Maps.newHashMap(); public AbstractExecutable() { @@ -396,6 +397,13 @@ public static long getDuration(long startTime, long endTime, long interruptTime) } } + public AbstractExecutable getParentExecutable() { + return parentExecutable; + } + public void setParentExecutable(AbstractExecutable parentExecutable) { + this.parentExecutable = parentExecutable; + } + public static long getExtraInfoAsLong(Output output, String key, long defaultValue) { final String str = output.getExtra().get(key); if (str != null) { diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 63be24ec2b..ec660fd28b 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -170,6 +170,7 @@ public final AbstractExecutable getTaskByName(String name) { @Override public void addTask(AbstractExecutable executable) { + executable.setParentExecutable(this); executable.setId(getId() + "-" + String.format(Locale.ROOT, "%02d", subTasks.size())); this.subTasks.add(executable); } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 58f3945f1f..5cc8a0f7d7 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -531,6 +531,10 @@ private AbstractExecutable parseTo(ExecutablePO executablePO) { if (tasks != null && !tasks.isEmpty()) { Preconditions.checkArgument(result instanceof ChainedExecutable); for (ExecutablePO subTask : tasks) { + AbstractExecutable subTaskExecutable = parseTo(subTask); + if (subTaskExecutable != null) { + subTaskExecutable.setParentExecutable(result); + } ((ChainedExecutable) result).addTask(parseTo(subTask)); } } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java index 23eb53fda0..d8e3b028aa 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java @@ -361,6 +361,14 @@ public String getMaterializedName() { return MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + database.getName() + "_" + name; } + public String getMaterializedName(String uuid) { + if (uuid == null) { + return getMaterializedName(); + } else + return MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + database.getName() + "_" + name + "_" + + uuid.replaceAll("-", "_"); + } + @Override public String toString() { return "TableDesc{" + "name='" + name + '\'' + ", columns=" + Arrays.toString(columns) + ", sourceType=" diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java index f79d0f0a7d..43df3f1f52 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java @@ -43,7 +43,7 @@ /** * Return a ReadableTable that can iterate through the rows of given table. */ - IReadableTable createReadableTable(TableDesc tableDesc); + IReadableTable createReadableTable(TableDesc tableDesc, String uuid); /** * Give the source a chance to enrich a SourcePartition before build start. diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java index 62c43682f1..03559bc370 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java @@ -140,8 +140,8 @@ public KylinConfig getConfig() { }); } - public static IReadableTable createReadableTable(TableDesc table) { - return getSource(table).createReadableTable(table); + public static IReadableTable createReadableTable(TableDesc table, String uuid) { + return getSource(table).createReadableTable(table, uuid); } public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java index f650321874..c259c4eadd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java @@ -36,7 +36,7 @@ public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc); /** Return an InputFormat that reads from specified table. */ - public IMRTableInputFormat getTableInputFormat(TableDesc table); + public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid); /** Return a helper to participate in batch cubing merge job flow. */ public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 5b1f38c51c..5f27bf8455 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -170,6 +170,7 @@ public HadoopShellExecutable createBuildDictionaryStep(String jobId) { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId)); appendExecCmdParameters(cmd, BatchConstants.ARG_DICT_PATH, getDictRootPath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); buildDictionaryStep.setJobParams(cmd.toString()); buildDictionaryStep.setJobClass(CreateDictionaryJob.class); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index 85a425cb71..3a0fb84b0f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -42,13 +42,13 @@ public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) { return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc); } - public static IMRTableInputFormat getTableInputFormat(String tableName, String prj) { + public static IMRTableInputFormat getTableInputFormat(String tableName, String prj, String uuid) { TableDesc t = getTableDesc(tableName, prj); - return SourceManager.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t); + return SourceManager.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t, uuid); } - public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) { - return SourceManager.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc); + public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc, String uuid) { + return SourceManager.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc, uuid); } private static TableDesc getTableDesc(String tableName, String prj) { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index e01da9e76c..aeb7b120fc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -59,16 +59,18 @@ public int run(String[] args) throws Exception { options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_DICT_PATH); + options.addOption(OPTION_CUBING_JOB_ID); parseOptions(options, args); final String cubeName = getOptionValue(OPTION_CUBE_NAME); final String segmentID = getOptionValue(OPTION_SEGMENT_ID); + final String jobId = getOptionValue(OPTION_CUBING_JOB_ID); final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH); final String dictPath = getOptionValue(OPTION_DICT_PATH); final KylinConfig config = KylinConfig.getInstanceFromEnv(); - DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() { + DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, jobId, new DistinctColumnValuesProvider() { @Override public IReadableTable getDistinctValuesFor(TblColRef col) { return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getIdentity(), col.getType()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java index c64694c284..753b67c2b6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java @@ -61,7 +61,7 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio CubeDesc cubeDesc = cube.getDescriptor(); try { TableDesc tableDesc = metaMgr.getTableDesc(lookupTableName, cube.getProject()); - IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null); logger.info("take snapshot for table:" + lookupTableName); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cube.getConfig()); diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java index 872f570d76..efdf54b4c9 100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java @@ -56,7 +56,7 @@ public void after() throws Exception { public void basicTest() throws Exception { String tableName = "EDW.TEST_SITES"; TableDesc tableDesc = TableMetadataManager.getInstance(getTestConfig()).getTableDesc(tableName, "default"); - IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null); String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc, getTestConfig()).getResourcePath(); snapshotMgr.wipeoutCache(); diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java b/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java index 2b1bf8ec10..dcfa19de70 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java +++ b/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java @@ -165,6 +165,10 @@ public String getCUBE_RENAME() { return "Cube 不能被重命名"; } + public String getREBUILD_SNAPSHOT_OF_VIEW() { + return "不支持重新构建 Hive view '%s' 的 snapshot, 请刷新 Cube 的 segment"; + } + // Model public String getINVALID_MODEL_DEFINITION() { return "非法模型定义"; diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java index 5f7e296c81..7c0dbe334f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java +++ b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java @@ -165,6 +165,10 @@ public String getCUBE_RENAME() { return "Cube renaming is not allowed."; } + public String getREBUILD_SNAPSHOT_OF_VIEW() { + return "Rebuild snapshot of hive view '%s' is not supported, please refresh segment of the cube"; + } + // Model public String getINVALID_MODEL_DEFINITION() { return "The data model definition is invalid."; diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index b8b6a138f1..d4ff9704ad 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -58,6 +58,7 @@ import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.project.RealizationEntry; @@ -497,8 +498,14 @@ public void updateCubeNotifyList(CubeInstance cube, List<String> notifyList) thr public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable) throws IOException { aclEvaluate.checkProjectOperationPermission(cube); + Message msg = MsgPicker.getMsg(); + TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject()); + if (tableDesc.isView()) { + throw new BadRequestException( + String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName())); + } CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY); - getCubeManager().buildSnapshotTable(seg, lookupTable); + getCubeManager().buildSnapshotTable(seg, lookupTable, null); return cube; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 1cbcfcd35a..d8aa7111e6 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -67,6 +67,7 @@ import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; +import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.msg.Message; @@ -395,6 +396,12 @@ public JobInstance submitRecoverSegmentOptimizeJob(CubeSegment segment, String s public JobInstance submitLookupSnapshotJob(CubeInstance cube, String lookupTable, List<String> segmentIDs, String submitter) throws IOException { + Message msg = MsgPicker.getMsg(); + TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject()); + if (tableDesc.isView()) { + throw new BadRequestException( + String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName())); + } LookupSnapshotBuildJob job = new LookupSnapshotJobBuilder(cube, lookupTable, segmentIDs, submitter).build(); getExecutableManager().addJob(job); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index b074775ec2..4009fc9851 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -378,7 +378,7 @@ public String getSnapshotLocalCacheState(String tableName, String snapshotID) { public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException { TableDesc tableDesc = getTableManager().getTableDesc(tableName, project); - IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null); TableSignature signature = hiveTable.getSignature(); return internalGetLookupTableSnapshots(tableName, signature); } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index 19fee3157e..2c998dfb08 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -51,8 +51,8 @@ @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(HiveInputBase.class); - protected static String getTableNameForHCat(TableDesc table) { - String tableName = (table.isView()) ? table.getMaterializedName() : table.getName(); + protected static String getTableNameForHCat(TableDesc table, String uuid) { + String tableName = (table.isView()) ? table.getMaterializedName(uuid) : table.getName(); String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() : table.getDatabase(); return String.format(Locale.ROOT, "%s.%s", database, tableName).toUpperCase(Locale.ROOT); @@ -94,7 +94,7 @@ protected static AbstractExecutable createRedistributeFlatHiveTableStep(String h } protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, - String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables) { + String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables, String uuid) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); @@ -119,8 +119,8 @@ protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveCmdBuilder.addStatement(hiveInitStatements); for (TableDesc lookUpTableDesc : lookupViewsTables) { String identity = lookUpTableDesc.getIdentity(); - String intermediate = lookUpTableDesc.getMaterializedName(); if (lookUpTableDesc.isView()) { + String intermediate = lookUpTableDesc.getMaterializedName(uuid); String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir); hiveCmdBuilder.addStatement(materializeViewHql); intermediateTables.add(intermediate); @@ -135,7 +135,7 @@ protected static ShellExecutable createLookupHiveViewMaterializationStep(String protected static String materializeViewHql(String viewName, String tableName, String jobWorkingDir) { StringBuilder createIntermediateTableHql = new StringBuilder(); createIntermediateTableHql.append("DROP TABLE IF EXISTS " + viewName + ";\n"); - createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName + createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName + " LOCATION '" + jobWorkingDir + "/" + viewName + "';\n"); createIntermediateTableHql.append("ALTER TABLE " + viewName + " SET TBLPROPERTIES('auto.purge'='true');\n"); createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + viewName + " SELECT * FROM " + tableName + ";\n"); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 33b10595c3..d6b85eddcd 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -58,8 +58,8 @@ public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flat } @Override - public IMRTableInputFormat getTableInputFormat(TableDesc table) { - return new HiveTableInputFormat(getTableNameForHCat(table)); + public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) { + return new HiveTableInputFormat(getTableNameForHCat(table, uuid)); } @Override @@ -139,7 +139,8 @@ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, + cubeInstance.getDescriptor())); } // special for hive @@ -158,7 +159,8 @@ protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable j final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); - AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, flatDesc, hiveViewIntermediateTables); + AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, + flatDesc, hiveViewIntermediateTables, jobFlow.getId()); if (task != null) { jobFlow.addTask(task); } @@ -194,7 +196,8 @@ private String getIntermediateTableIdentity() { * @deprecated For backwards compatibility. */ @Deprecated - public static class RedistributeFlatHiveTableStep extends org.apache.kylin.source.hive.RedistributeFlatHiveTableStep { + public static class RedistributeFlatHiveTableStep + extends org.apache.kylin.source.hive.RedistributeFlatHiveTableStep { } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index 938114c2d6..b536bf046f 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -54,12 +54,12 @@ public ISourceMetadataExplorer getSourceMetadataExplorer() { } @Override - public IReadableTable createReadableTable(TableDesc tableDesc) { + public IReadableTable createReadableTable(TableDesc tableDesc, String uuid) { // hive view must have been materialized already // ref HiveMRInput.createLookupHiveViewMaterializationStep() if (tableDesc.isView()) { KylinConfig config = KylinConfig.getInstanceFromEnv(); - String tableName = tableDesc.getMaterializedName(); + String tableName = tableDesc.getMaterializedName(uuid); tableDesc = new TableDesc(); tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable()); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java index 881be1ab3b..d710db7fdc 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java @@ -85,7 +85,8 @@ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, + cubeInstance.getDescriptor())); } // special for hive @@ -96,7 +97,8 @@ protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable j final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); - AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, flatDesc, hiveViewIntermediateTables); + AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, + flatDesc, hiveViewIntermediateTables, jobFlow.getId()); if (task != null) { jobFlow.addTask(task); } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index da44ea53fc..18d14b8778 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -64,7 +64,7 @@ protected void doSetup(Context context) throws IOException { String project = conf.get(BatchConstants.CFG_PROJECT_NAME); String tableName = conf.get(BatchConstants.CFG_TABLE_NAME); tableDesc = TableMetadataManager.getInstance(config).getTableDesc(tableName, project); - tableInputFormat = MRUtil.getTableInputFormat(tableDesc); + tableInputFormat = MRUtil.getTableInputFormat(tableDesc, conf.get(BatchConstants.ARG_CUBING_JOB_ID)); } @Override diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index dd32a58a19..f51fce0f35 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -50,7 +50,8 @@ public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job"; @SuppressWarnings("static-access") - protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table"); + protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true) + .withDescription("The hive table name").create("table"); public HiveColumnCardinalityJob() { } @@ -90,7 +91,8 @@ public int run(String[] args) throws Exception { job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false"); // Mapper - IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project); + IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project, + getOptionValue(OPTION_CUBING_JOB_ID)); tableInputFormat.configureJob(job); job.setMapperClass(ColumnCardinalityMapper.class); diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java index 37d119eefe..20e882ac3b 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java @@ -51,7 +51,7 @@ public ISourceMetadataExplorer getSourceMetadataExplorer() { } @Override - public IReadableTable createReadableTable(TableDesc tableDesc) { + public IReadableTable createReadableTable(TableDesc tableDesc, String uuid) { return new JdbcTable(tableDesc); } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 6232033c76..1c94f9c7c7 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -62,7 +62,7 @@ public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flat } @Override - public IMRTableInputFormat getTableInputFormat(TableDesc table) { + public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) { return new KafkaTableInputFormat(cubeSegment, null); } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 264f2ce8a6..0d9c845c4a 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -71,7 +71,7 @@ public KafkaSource(KylinConfig config) { } @Override - public IReadableTable createReadableTable(TableDesc tableDesc) { + public IReadableTable createReadableTable(TableDesc tableDesc, String uuid) { throw new UnsupportedOperationException(); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java index fb5bab56b6..1d9181b613 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java @@ -82,7 +82,7 @@ public void addMaterializeLookupTableSteps(LookupMaterializeContext context, Str KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig); TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName, cube.getProject()); - IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc); + IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc, context.getJobFlow().getId()); try { ExtTableSnapshotInfo latestSnapshot = extTableSnapshotInfoManager.getLatestSnapshot( sourceTable.getSignature(), tableName); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java index 9146fc4f66..0135a2204f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java @@ -91,6 +91,7 @@ public int run(String[] args) throws Exception { String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(Locale.ROOT); String tableName = getOptionValue(OPTION_TABLE_NAME); String lookupSnapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID); + String jobId = getOptionValue(OPTION_CUBING_JOB_ID); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); CubeManager cubeMgr = CubeManager.getInstance(kylinConfig); @@ -102,7 +103,7 @@ public int run(String[] args) throws Exception { ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig); removeSnapshotIfExist(extSnapshotInfoManager, kylinConfig, tableName, lookupSnapshotID); - IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc); + IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc, jobId); logger.info("create HTable for source table snapshot:{}", tableName); Pair<String, Integer> hTableNameAndShard = createHTable(tableName, sourceTable, kylinConfig); @@ -119,7 +120,7 @@ public int run(String[] args) throws Exception { FileOutputFormat.setOutputPath(job, output); - IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(tableDesc); + IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(tableDesc, jobId); tableInputFormat.configureJob(job); job.setMapperClass(LookupTableToHFileMapper.class); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java index 4be9533b08..0ad63e9dce 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Map.Entry; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.KylinConfig; @@ -79,7 +80,8 @@ protected void doSetup(Context context) throws IOException { keyColumns[i] = keyColRefs[i].getName(); } encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, shardNum); - lookupTableInputFormat = MRUtil.getTableInputFormat(tableDesc); + Configuration conf = context.getConfiguration(); + lookupTableInputFormat = MRUtil.getTableInputFormat(tableDesc, conf.get(BatchConstants.ARG_CUBING_JOB_ID)); } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services