KYLIN-2176 org.apache.kylin.rest.service.JobService#submitJob will leave orphan NEW segment in cube when exception is met
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b4233844 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b4233844 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b4233844 Branch: refs/heads/yang21-cdh5.7 Commit: b4233844816a722c3259109804d20a9fb8f5e954 Parents: 2e3c7ca Author: Hongbin Ma <mahong...@apache.org> Authored: Fri Nov 11 16:32:00 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Fri Nov 11 16:32:05 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/rest/service/JobService.java | 43 ++++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b4233844/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 2e547c7..186e265 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -209,19 +209,38 @@ public class JobService extends BasicService { DefaultChainedExecutable job; - if (buildType == CubeBuildTypeEnum.BUILD) { - CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset); - job = EngineFactory.createBatchCubingJob(newSeg, submitter); - } else if (buildType == CubeBuildTypeEnum.MERGE) { - CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force); - job = EngineFactory.createBatchMergeJob(newSeg, submitter); - } else if (buildType == CubeBuildTypeEnum.REFRESH) { - CubeSegment refreshSeg = getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset); - job = EngineFactory.createBatchCubingJob(refreshSeg, submitter); - } else { - throw new JobException("invalid build type:" + buildType); + CubeSegment newSeg = null; + try { + if (buildType == CubeBuildTypeEnum.BUILD) { + newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset); + job = EngineFactory.createBatchCubingJob(newSeg, submitter); + } else if (buildType == CubeBuildTypeEnum.MERGE) { + newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force); + job = EngineFactory.createBatchMergeJob(newSeg, submitter); + } else if (buildType == CubeBuildTypeEnum.REFRESH) { + newSeg = getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset); + job = EngineFactory.createBatchCubingJob(newSeg, submitter); + } else { + throw new JobException("invalid build type:" + buildType); + } + getExecutableManager().addJob(job); + + } catch (Exception e) { + if (newSeg != null) { + logger.error("Job submission might failed for NEW segment {}, will clean the NEW segment from cube", newSeg.getName()); + try { + // Remove this segments + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(newSeg); + getCubeManager().updateCube(cubeBuilder); + } catch (Exception ee) { + // swallow the exception + logger.error("Clean New segment failed, ignoring it", e); + } + } + throw e; } - getExecutableManager().addJob(job); + JobInstance jobInstance = getSingleJobInstance(job); accessService.init(jobInstance, null);