This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new ea7a8b4  KYLIN-4766 Delete job tmp and segment file after job be 
discarded
ea7a8b4 is described below

commit ea7a8b454fd884fd504b17ec8dabcade89e76b0f
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Fri Sep 25 14:55:41 2020 +0800

    KYLIN-4766 Delete job tmp and segment file after job be discarded
---
 .../apache/kylin/engine/spark/job/NSparkCubingJob.java  | 17 +++++++++++++++++
 .../java/org/apache/kylin/rest/service/JobService.java  |  9 ++++++++-
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
index c2e615d..da00f6d 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.engine.spark.job;
 
+import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Locale;
@@ -32,8 +33,10 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.spark_project.guava.base.Preconditions;
@@ -139,4 +142,18 @@ public class NSparkCubingJob extends CubingJob {
     public void setCube(CubeInstance cube) {
         this.cube = cube;
     }
+
+    public void cleanupAfterJobDiscard() {
+        try {
+            PathManager.deleteJobTempPath(getConfig(), 
getParam(MetadataConstants.P_PROJECT_NAME),
+                    getParam(MetadataConstants.P_JOB_ID));
+
+            CubeManager cubeManager = CubeManager.getInstance(getConfig());
+            CubeInstance cube = 
cubeManager.getCube(getParam(MetadataConstants.P_CUBE_NAME));
+            CubeSegment segment = 
cube.getSegment(getParam(MetadataConstants.SEGMENT_NAME), 
SegmentStatusEnum.NEW);
+            PathManager.deleteSegmentParquetStoragePath(cube, segment);
+        } catch (IOException e) {
+            logger.warn("Delete resource file failed after job be discarded, 
due to", e);
+        }
+    }
 }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 1ce6521..f7be7d1 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -48,6 +48,7 @@ import org.apache.kylin.engine.mr.LookupSnapshotBuildJob;
 import org.apache.kylin.engine.mr.common.CubeJobLockUtil;
 import org.apache.kylin.engine.mr.common.JobInfoConverter;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.job.NSparkCubingJob;
 import org.apache.kylin.engine.spark.metadata.cube.source.SourceFactory;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JobSearchResult;
@@ -644,9 +645,15 @@ public class JobService extends BasicService implements 
InitializingBean {
                     "The job " + job.getId() + " has already been finished and 
cannot be discarded.");
         }
 
+        AbstractExecutable executable = 
getExecutableManager().getJob(job.getId());
+
         if (job.getStatus() != JobStatusEnum.DISCARDED) {
-            AbstractExecutable executable = 
getExecutableManager().getJob(job.getId());
             if (executable instanceof CubingJob) {
+                //Clean up job tmp and segment storage from hdfs after job be 
discarded
+                if (executable instanceof NSparkCubingJob) {
+                    ((NSparkCubingJob) executable).cleanupAfterJobDiscard();
+                }
+
                 cancelCubingJobInner((CubingJob) executable);
                 //release global mr hive dict lock if exists
                 if (executable.getStatus().isFinalState()) {

Reply via email to