Revert "KYLIN-1726 add test case BuildCubeWithStream2"

This reverts commit 3e081b3fbec4fc8a6cc4ddf8795d2fd581ae04f4.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f9692fa5
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f9692fa5
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f9692fa5

Branch: refs/heads/tempmaster
Commit: f9692fa5599de04030599b699f5460bdc7cf0e42
Parents: c68bba0
Author: Hongbin Ma <mahong...@apache.org>
Authored: Mon Sep 19 23:49:51 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Mon Sep 19 23:49:51 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/Kafka10DataLoader.java  |   4 +
 .../apache/kylin/common/KylinConfigBase.java    |   4 -
 .../java/org/apache/kylin/cube/CubeManager.java |  28 +-
 .../org/apache/kylin/job/dao/ExecutableDao.java |   1 -
 .../kylin/job/manager/ExecutableManager.java    |   2 +-
 .../streaming/cube/StreamingCubeBuilder.java    |   2 +-
 .../test_streaming_table_cube_desc.json         |   3 +-
 .../kylin/provision/BuildCubeWithStream.java    |  32 +--
 .../kylin/provision/BuildCubeWithStream2.java   | 274 -------------------
 .../kylin/rest/controller/CubeController.java   |   8 +-
 .../apache/kylin/rest/service/JobService.java   |   4 +-
 .../kylin/source/kafka/SeekOffsetStep.java      |   7 +-
 12 files changed, 49 insertions(+), 320 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git 
a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java 
b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
index 2b299cc..a5132af 100644
--- 
a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ 
b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
@@ -65,9 +65,13 @@ public class Kafka10DataLoader extends StreamDataLoader {
         props.put("retry.backoff.ms", "1000");
         KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, 
props);
 
+        int boundary = messages.size() / 10;
         for (int i = 0; i < messages.size(); ++i) {
             ProducerRecord<String, String> keyedMessage = new 
ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), 
messages.get(i));
             producer.send(keyedMessage);
+            if (i % boundary == 0) {
+                logger.info("sending " + i + " messages to " + 
this.toString());
+            }
         }
         logger.info("sent " + messages.size() + " messages to " + 
this.toString());
         producer.close();

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3b06ed8..fafb1fc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -809,8 +809,4 @@ abstract public class KylinConfigBase implements 
Serializable {
     public String getCreateFlatHiveTableMethod() {
         return getOptional("kylin.hive.create.flat.table.method", "1");
     }
-
-    public int getMaxBuildingSegments() {
-        return Integer.parseInt(getOptional("kylin.cube.building.segment.max", 
"1"));
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 57b9510..d494fcc 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -400,8 +400,13 @@ public class CubeManager implements IRealizationProvider {
     }
 
     public CubeSegment appendSegment(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset) throws IOException {
+        return appendSegment(cube, startDate, endDate, startOffset, endOffset, 
true);
+    }
+
+    public CubeSegment appendSegment(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset, boolean strictChecking) throws 
IOException {
 
-        checkBuildingSegment(cube);
+        if (strictChecking)
+            checkNoBuildingSegment(cube);
 
         if 
(cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
             // try figure out a reasonable start if missing
@@ -431,9 +436,12 @@ public class CubeManager implements IRealizationProvider {
         updateCube(cubeBuilder);
         return newSegment;
     }
-
     public CubeSegment refreshSegment(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset) throws IOException {
-        checkBuildingSegment(cube);
+        return refreshSegment(cube, startDate, endDate, startOffset, 
endOffset, true);
+    }
+
+    public CubeSegment refreshSegment(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset, boolean strictChecking) throws 
IOException {
+        checkNoBuildingSegment(cube);
 
         CubeSegment newSegment = newSegment(cube, startDate, endDate, 
startOffset, endOffset);
 
@@ -454,7 +462,7 @@ public class CubeManager implements IRealizationProvider {
         if (startDate >= endDate && startOffset >= endOffset)
             throw new IllegalArgumentException("Invalid merge range");
 
-        checkBuildingSegment(cube);
+        checkNoBuildingSegment(cube);
         checkCubeIsPartitioned(cube);
 
         boolean isOffsetsOn = cube.getSegments().get(0).isSourceOffsetsOn();
@@ -580,10 +588,9 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
-    private void checkBuildingSegment(CubeInstance cube) {
-        int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
-        if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
-            throw new IllegalStateException("There is already " + 
cube.getBuildingSegments().size() + " building segment; ");
+    private void checkNoBuildingSegment(CubeInstance cube) {
+        if (cube.getBuildingSegments().size() > 0) {
+            throw new IllegalStateException("There is already a building 
segment!");
         }
     }
 
@@ -722,9 +729,8 @@ public class CubeManager implements IRealizationProvider {
         }
 
         for (CubeSegment seg : tobe) {
-            if (isReady(seg) == false) {
-                logger.warn("For cube " + cube + ", segment " + seg + " isn't 
READY yet.");
-            }
+            if (isReady(seg) == false)
+                throw new IllegalStateException("For cube " + cube + ", 
segment " + seg + " should be READY but is not");
         }
 
         List<CubeSegment> toRemoveSegs = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java 
b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 5cae5ac..8808a56 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -207,7 +207,6 @@ public class ExecutableDao {
     }
 
     public void updateJobOutput(ExecutableOutputPO output) throws 
PersistentException {
-        logger.debug("updating job output, id: " + output.getUuid());
         try {
             final long ts = 
writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
             output.setLastModified(ts);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
index d42b924..3a19486 100644
--- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
@@ -278,7 +278,7 @@ public class ExecutableManager {
             ExecutableState oldStatus = 
ExecutableState.valueOf(jobOutput.getStatus());
             if (newStatus != null && oldStatus != newStatus) {
                 if (!ExecutableState.isValidStateTransfer(oldStatus, 
newStatus)) {
-                    throw new IllegalStateTranferException("there is no valid 
state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId);
+                    throw new IllegalStateTranferException("there is no valid 
state transfer from:" + oldStatus + " to:" + newStatus);
                 }
                 jobOutput.setStatus(newStatus.toString());
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index a42ec05..180f0b8 100644
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -119,7 +119,7 @@ public class StreamingCubeBuilder implements 
StreamingBatchBuilder {
         CubeManager cubeManager = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         final CubeInstance cubeInstance = 
cubeManager.reloadCubeLocal(cubeName);
         try {
-            CubeSegment segment = cubeManager.appendSegment(cubeInstance, 
streamingBatch.getTimeRange().getFirst(), 
streamingBatch.getTimeRange().getSecond(), 0, 0);
+            CubeSegment segment = cubeManager.appendSegment(cubeInstance, 
streamingBatch.getTimeRange().getFirst(), 
streamingBatch.getTimeRange().getSecond(), 0, 0, false);
             segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
             segment.setInputRecords(streamingBatch.getMessages().size());
             segment.setLastBuildTime(System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
----------------------------------------------------------------------
diff --git 
a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
 
b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
index 8279417..ef10c1e 100644
--- 
a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
+++ 
b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
@@ -106,8 +106,7 @@
     }
   } ],
   "override_kylin_properties": {
-    "kylin.cube.algorithm": "inmem",
-    "kylin.cube.building.segment.max": "3"
+    "kylin.cube.algorithm": "inmem"
   },
   "notify_list" : [ ],
   "status_need_notify" : [ ],

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index b7c609e..9e779ab 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -137,21 +137,15 @@ public class BuildCubeWithStream {
 
         int numberOfRecrods1 = 10000;
         generateStreamData(date1, date2, numberOfRecrods1);
-        ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE);
-        Assert.assertTrue(result == ExecutableState.SUCCEED);
-        long date3 = f.parse("2013-04-01").getTime();
-        int numberOfRecords2 = 5000;
-        generateStreamData(date2, date3, numberOfRecords2);
-        result = buildSegment(cubeName, 0, Long.MAX_VALUE);
-        Assert.assertTrue(result == ExecutableState.SUCCEED);
+        buildSegment(cubeName, 0, Long.MAX_VALUE);
 
-        //empty build
-        result = buildSegment(cubeName, 0, Long.MAX_VALUE);
-        Assert.assertTrue(result == ExecutableState.DISCARDED);
+        long date3 = f.parse("2013-04-01").getTime();
+        int numberOfRecrods2 = 5000;
+        generateStreamData(date2, date3, numberOfRecrods2);
+        buildSegment(cubeName, 0, Long.MAX_VALUE);
 
         //merge
-        result = mergeSegment(cubeName, 0, 15000);
-        Assert.assertTrue(result == ExecutableState.SUCCEED);
+        mergeSegment(cubeName, 0, 15000);
 
         List<CubeSegment> segments = 
cubeManager.getCube(cubeName).getSegments();
         Assert.assertTrue(segments.size() == 1);
@@ -165,16 +159,16 @@ public class BuildCubeWithStream {
 
     }
 
-    private ExecutableState mergeSegment(String cubeName, long startOffset, 
long endOffset) throws Exception {
+    private String mergeSegment(String cubeName, long startOffset, long 
endOffset) throws Exception {
         CubeSegment segment = 
cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset, false);
         DefaultChainedExecutable job = 
EngineFactory.createBatchMergeJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
-        return job.getStatus();
+        return job.getId();
     }
 
     private String refreshSegment(String cubeName, long startOffset, long 
endOffset, HashMap<String, String> partitionOffsetMap) throws Exception {
-        CubeSegment segment = 
cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset);
+        CubeSegment segment = 
cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset, false);
         segment.setAdditionalInfo(partitionOffsetMap);
         CubeInstance cubeInstance = cubeManager.getCube(cubeName);
         CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
@@ -187,12 +181,12 @@ public class BuildCubeWithStream {
         return job.getId();
     }
 
-    private ExecutableState buildSegment(String cubeName, long startOffset, 
long endOffset) throws Exception {
-        CubeSegment segment = 
cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset);
+    private String buildSegment(String cubeName, long startOffset, long 
endOffset) throws Exception {
+        CubeSegment segment = 
cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset, false);
         DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
-        return job.getStatus();
+        return job.getId();
     }
 
     protected void deployEnv() throws IOException {
@@ -222,7 +216,7 @@ public class BuildCubeWithStream {
     protected void waitForJob(String jobId) {
         while (true) {
             AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() 
== ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) {
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() 
== ExecutableState.ERROR) {
                 break;
             } else {
                 try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
deleted file mode 100644
index d48a473..0000000
--- 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.provision;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Random;
-import java.util.TimeZone;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Lists;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.job.streaming.Kafka10DataLoader;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.Thread.sleep;
-
-/**
- *  for streaming cubing case "test_streaming_table", using multiple threads 
to build it concurrently.
- */
-public class BuildCubeWithStream2 {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(BuildCubeWithStream2.class);
-
-    private CubeManager cubeManager;
-    private DefaultScheduler scheduler;
-    protected ExecutableManager jobService;
-    private static final String cubeName = "test_streaming_table_cube";
-
-    private KafkaConfig kafkaConfig;
-    private MockKafka kafkaServer;
-    private static boolean generateData = true;
-
-    public void before() throws Exception {
-        deployEnv();
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.createInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig), new 
ZookeeperJobLock());
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
-        cubeManager = CubeManager.getInstance(kylinConfig);
-
-        final CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
-        final String factTable = cubeInstance.getFactTable();
-
-        final StreamingManager streamingManager = 
StreamingManager.getInstance(kylinConfig);
-        final StreamingConfig streamingConfig = 
streamingManager.getStreamingConfig(factTable);
-        kafkaConfig = 
KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName());
-
-        String topicName = UUID.randomUUID().toString();
-        String localIp = NetworkUtils.getLocalIp();
-        BrokerConfig brokerConfig = 
kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0);
-        brokerConfig.setHost(localIp);
-        kafkaConfig.setTopic(topicName);
-        
KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig);
-
-        startEmbeddedKafka(topicName, brokerConfig);
-    }
-
-    private void startEmbeddedKafka(String topicName, BrokerConfig 
brokerConfig) {
-        //Start mock Kakfa
-        String zkConnectionStr = "sandbox:2181";
-        ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
-        // Assert.assertEquals(ZooKeeper.States.CONNECTED, 
zkConnection.getZookeeperState());
-        kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), 
brokerConfig.getId());
-        kafkaServer.start();
-
-        kafkaServer.createTopic(topicName, 3, 1);
-        kafkaServer.waitTopicUntilReady(topicName);
-
-        MetadataResponse.TopicMetadata topicMetadata = 
kafkaServer.fetchTopicMeta(topicName);
-        Assert.assertEquals(topicName, topicMetadata.topic());
-    }
-
-    private void generateStreamData(long startTime, long endTime, int 
numberOfRecords) throws IOException {
-        if (numberOfRecords <= 0)
-            return;
-        Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig);
-        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, 
numberOfRecords, cubeName, dataLoader);
-        logger.info("Test data inserted into Kafka");
-    }
-
-    private void clearSegment(String cubeName) throws Exception {
-        CubeInstance cube = cubeManager.getCube(cubeName);
-        // remove all existing segments
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new 
CubeSegment[cube.getSegments().size()]));
-        cubeManager.updateCube(cubeBuilder);
-    }
-
-    public void build() throws Exception {
-        clearSegment(cubeName);
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-        final long date1 = 0;
-        final long date2 = f.parse("2013-01-01").getTime();
-
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-
-                Random rand = new Random();
-                while (generateData == true) {
-                    try {
-                        generateStreamData(date1, date2, rand.nextInt(100));
-                        sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait 
random time, from 0 to 100 seconds
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-        }).start();
-        ExecutorService executorService = Executors.newFixedThreadPool(4);
-
-        List<FutureTask<ExecutableState>> futures = Lists.newArrayList();
-        for (int i = 0; i < 5; i++) {
-            FutureTask futureTask = new FutureTask(new 
Callable<ExecutableState>() {
-                @Override
-                public ExecutableState call() {
-                    ExecutableState result = null;
-                    try {
-                        result = buildSegment(cubeName, 0, Long.MAX_VALUE);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-
-                    return result;
-                }
-            });
-
-            executorService.submit(futureTask);
-            futures.add(futureTask);
-            Thread.sleep(2 * 60 * 1000); // sleep 2 mintues
-        }
-
-        generateData = false; // stop generating message to kafka
-        executorService.shutdown();
-        int succeedBuild = 0;
-        for (int i = 0; i < futures.size(); i++) {
-            ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES);
-            logger.info("Checking building task " + i + " whose state is " + 
result);
-            Assert.assertTrue(result == null || result == 
ExecutableState.SUCCEED || result == ExecutableState.DISCARDED );
-            if (result == ExecutableState.SUCCEED)
-                succeedBuild++;
-        }
-
-        logger.info(succeedBuild + " build jobs have been successfully 
completed.");
-        List<CubeSegment> segments = 
cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY);
-        Assert.assertTrue(segments.size() == succeedBuild);
-
-    }
-
-
-    private ExecutableState buildSegment(String cubeName, long startOffset, 
long endOffset) throws Exception {
-        CubeSegment segment = 
cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset);
-        DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
-        jobService.addJob(job);
-        waitForJob(job.getId());
-        return job.getStatus();
-    }
-
-    protected void deployEnv() throws IOException {
-        DeployUtil.overrideJobJarLocations();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-    }
-
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, 
HBaseMetadataTestCase.SANDBOX_TEST_DATA);
-        if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
-            throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
-        }
-        
HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
-    }
-
-    public static void afterClass() throws Exception {
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-    }
-
-    public void after() {
-        kafkaServer.stop();
-    }
-
-    protected void waitForJob(String jobId) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() 
== ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) {
-                break;
-            } else {
-                try {
-                    sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        try {
-            beforeClass();
-
-            BuildCubeWithStream2 buildCubeWithStream = new 
BuildCubeWithStream2();
-            buildCubeWithStream.before();
-            buildCubeWithStream.build();
-            logger.info("Build is done");
-            buildCubeWithStream.after();
-            afterClass();
-            logger.info("Going to exit");
-            System.exit(0);
-        } catch (Exception e) {
-            logger.error("error", e);
-            System.exit(1);
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 42b117c..669f53e 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -272,7 +272,7 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/rebuild", method = { 
RequestMethod.PUT })
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody 
JobBuildRequest req) {
-        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 
0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
+        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 
0, 0, req.getBuildType(), true, req.isForce() || 
req.isForceMergeEmptySegment());
     }
 
     /** Build/Rebuild a cube segment by source offset */
@@ -286,16 +286,16 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/rebuild2", method = { 
RequestMethod.PUT })
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody 
JobBuildRequest2 req) {
-        return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), 
req.getEndSourceOffset(), req.getBuildType(), req.isForce());
+        return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), 
req.getEndSourceOffset(), req.getBuildType(), false, req.isForce());
     }
 
     private JobInstance buildInternal(String cubeName, long startTime, long 
endTime, //
-            long startOffset, long endOffset, String buildType, boolean force) 
{
+            long startOffset, long endOffset, String buildType, boolean 
strictCheck, boolean force) {
         try {
             String submitter = 
SecurityContextHolder.getContext().getAuthentication().getName();
             CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
             return jobService.submitJob(cube, startTime, endTime, startOffset, 
endOffset, //
-                    CubeBuildTypeEnum.valueOf(buildType), force, submitter);
+                    CubeBuildTypeEnum.valueOf(buildType), strictCheck, force, 
submitter);
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 5c704ba..8929bf1 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -199,7 +199,7 @@ public class JobService extends BasicService {
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 
'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 
'MANAGEMENT')")
     public JobInstance submitJob(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset, //
-            CubeBuildTypeEnum buildType, boolean force, String submitter) 
throws IOException, JobException {
+            CubeBuildTypeEnum buildType, boolean strictCheck, boolean force, 
String submitter) throws IOException, JobException {
 
         if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
             throw new BadRequestException("Broken cube " + cube.getName() + " 
can't be built");
@@ -211,7 +211,7 @@ public class JobService extends BasicService {
         DefaultChainedExecutable job;
 
         if (buildType == CubeBuildTypeEnum.BUILD) {
-            CubeSegment newSeg = getCubeManager().appendSegment(cube, 
startDate, endDate, startOffset, endOffset);
+            CubeSegment newSeg = getCubeManager().appendSegment(cube, 
startDate, endDate, startOffset, endOffset, strictCheck);
             job = EngineFactory.createBatchCubingJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
             CubeSegment newSeg = getCubeManager().mergeSegments(cube, 
startDate, endDate, startOffset, endOffset, force);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
index 9369e6f..479f1b8 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -17,6 +17,10 @@
 */
 package org.apache.kylin.source.kafka;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+import org.apache.commons.math3.util.MathUtils;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -34,6 +38,7 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -120,7 +125,7 @@ public class SeekOffsetStep extends AbstractExecutable {
             } catch (IOException e) {
                 return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
             }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, 
offset start: " + totalStartOffset + ", offset end: " + totalEndOffset);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } else {
             CubeUpdate cubeBuilder = new CubeUpdate(cube);
             cubeBuilder.setToRemoveSegs(segment);

Reply via email to