[03/50] [abbrv] kylin git commit: KYLIN-2072 Cleanup old streaming code

2016-10-18 Thread lidong
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
--
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
deleted file mode 100644
index 271bf41..000
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.cachesync.Broadcaster;
-import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
-import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class StreamingManager {
-
-private static final Logger logger = 
LoggerFactory.getLogger(StreamingManager.class);
-
-// static cached instances
-private static final ConcurrentHashMap 
CACHE = new ConcurrentHashMap();
-
-public static final Serializer STREAMING_SERIALIZER = new 
JsonSerializer(StreamingConfig.class);
-
-private KylinConfig config;
-
-// name ==> StreamingConfig
-private CaseInsensitiveStringCache streamingMap;
-
-public static void clearCache() {
-CACHE.clear();
-}
-
-private StreamingManager(KylinConfig config) throws IOException {
-this.config = config;
-this.streamingMap = new 
CaseInsensitiveStringCache(config, "streaming");
-
-// touch lower level metadata before registering my listener
-reloadAllStreaming();
-Broadcaster.getInstance(config).registerListener(new 
StreamingSyncListener(), "streaming");
-}
-
-private class StreamingSyncListener extends Broadcaster.Listener {
-@Override
-public void onClearAll(Broadcaster broadcaster) throws IOException {
-clearCache();
-}
-
-@Override
-public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
-if (event == Event.DROP)
-removeStreamingLocal(cacheKey);
-else
-reloadStreamingConfigLocal(cacheKey);
-}
-}
-
-private ResourceStore getStore() {
-return ResourceStore.getStore(this.config);
-}
-
-public static StreamingManager getInstance(KylinConfig config) {
-StreamingManager r = CACHE.get(config);
-if (r != null) {
-return r;
-}
-
-synchronized (StreamingManager.class) {
-r = CACHE.get(config);
-if (r != null) {
-return r;
-}
-try {
-r = new StreamingManager(config);
-CACHE.put(config, r);
-if (CACHE.size() > 1) {
-logger.warn("More than one singleton exist");
-}
-return r;
-} catch (IOException e) {
-throw new IllegalStateException("Failed to init 
StreamingManager from " + config, e);
-}
-}
-}
-
-private static String formatStreamingConfigPath(String name) {
-return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
-}
-
-private static String formatStreamingOutputPath(String streaming, int 
partition) {
-return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming 
+ "_" + partition + ".json";
-}
-
-private static String formatStreamingOutputPath(String streaming, 
List partitions) {
-retu

[03/50] [abbrv] kylin git commit: KYLIN-2072 Cleanup old streaming code

2016-10-18 Thread lidong
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
--
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
deleted file mode 100644
index 1c579c6..000
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.diagnose;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class TimeHistogram {
-private long[] bucketsBoundary;
-private AtomicLong[] counters;
-private String id;
-
-private static Object printLock = new Object();
-
-/**
- * example: [10,20] will generate three  buckets: (-∞,10), 
[10,20),[20,+∞)
- * unit: second
- */
-public TimeHistogram(long[] bucketsBoundary, String id) {
-this.bucketsBoundary = bucketsBoundary;
-this.counters = new AtomicLong[this.bucketsBoundary.length + 1];
-for (int i = 0; i < counters.length; i++) {
-this.counters[i] = new AtomicLong();
-}
-this.id = id;
-}
-
-/**
- * @param second in seconds
- */
-public void process(long second) {
-for (int i = 0; i < bucketsBoundary.length; ++i) {
-if (second < bucketsBoundary[i]) {
-counters[i].incrementAndGet();
-return;
-}
-}
-
-counters[bucketsBoundary.length].incrementAndGet();
-}
-
-/**
- * @param millis in milli seconds
- */
-public void processMillis(long millis) {
-process(millis / 1000);
-}
-
-public void printStatus() {
-long[] countersSnapshot = new long[counters.length];
-for (int i = 0; i < countersSnapshot.length; i++) {
-countersSnapshot[i] = counters[i].get();
-}
-
-long sum = 0;
-for (long counter : countersSnapshot) {
-sum += counter;
-}
-
-synchronized (printLock) {
-System.out.println("== status of TimeHistogram " + id 
+ " =");
-
-for (int i = 0; i < countersSnapshot.length; ++i) {
-System.out.println(String.format("bucket: %d , count: %d 
,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 
0 ? 1 : sum)));
-}
-
-}
-}
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
--
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
new file mode 100644
index 000..7a42598
--- /dev/null
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ */
+publi