[03/50] [abbrv] kylin git commit: KYLIN-2072 Cleanup old streaming code
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java -- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java deleted file mode 100644 index 271bf41..000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.cachesync.Broadcaster; -import org.apache.kylin.metadata.cachesync.Broadcaster.Event; -import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class StreamingManager { - -private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class); - -// static cached instances -private static final ConcurrentHashMap CACHE = new ConcurrentHashMap(); - -public static final Serializer STREAMING_SERIALIZER = new JsonSerializer(StreamingConfig.class); - -private KylinConfig config; - -// name ==> StreamingConfig -private CaseInsensitiveStringCache streamingMap; - -public static void clearCache() { -CACHE.clear(); -} - -private StreamingManager(KylinConfig config) throws IOException { -this.config = config; -this.streamingMap = new CaseInsensitiveStringCache(config, "streaming"); - -// touch lower level metadata before registering my listener -reloadAllStreaming(); -Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming"); -} - -private class StreamingSyncListener extends Broadcaster.Listener { -@Override -public void onClearAll(Broadcaster broadcaster) throws IOException { -clearCache(); -} - -@Override -public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { -if (event == Event.DROP) -removeStreamingLocal(cacheKey); -else -reloadStreamingConfigLocal(cacheKey); -} -} - -private ResourceStore getStore() { -return ResourceStore.getStore(this.config); -} - -public static StreamingManager getInstance(KylinConfig config) { -StreamingManager r = CACHE.get(config); -if (r != null) { -return r; -} - -synchronized (StreamingManager.class) { -r = CACHE.get(config); -if (r != null) { -return r; -} -try { -r = new StreamingManager(config); -CACHE.put(config, r); -if (CACHE.size() > 1) { -logger.warn("More than one singleton exist"); -} -return r; -} catch (IOException e) { -throw new IllegalStateException("Failed to init StreamingManager from " + config, e); -} -} -} - -private static String formatStreamingConfigPath(String name) { -return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json"; -} - -private static String formatStreamingOutputPath(String streaming, int partition) { -return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json"; -} - -private static String formatStreamingOutputPath(String streaming, List partitions) { -retu
[03/50] [abbrv] kylin git commit: KYLIN-2072 Cleanup old streaming code
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java deleted file mode 100644 index 1c579c6..000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.diagnose; - -import java.util.concurrent.atomic.AtomicLong; - -public class TimeHistogram { -private long[] bucketsBoundary; -private AtomicLong[] counters; -private String id; - -private static Object printLock = new Object(); - -/** - * example: [10,20] will generate three buckets: (-â,10), [10,20),[20,+â) - * unit: second - */ -public TimeHistogram(long[] bucketsBoundary, String id) { -this.bucketsBoundary = bucketsBoundary; -this.counters = new AtomicLong[this.bucketsBoundary.length + 1]; -for (int i = 0; i < counters.length; i++) { -this.counters[i] = new AtomicLong(); -} -this.id = id; -} - -/** - * @param second in seconds - */ -public void process(long second) { -for (int i = 0; i < bucketsBoundary.length; ++i) { -if (second < bucketsBoundary[i]) { -counters[i].incrementAndGet(); -return; -} -} - -counters[bucketsBoundary.length].incrementAndGet(); -} - -/** - * @param millis in milli seconds - */ -public void processMillis(long millis) { -process(millis / 1000); -} - -public void printStatus() { -long[] countersSnapshot = new long[counters.length]; -for (int i = 0; i < countersSnapshot.length; i++) { -countersSnapshot[i] = counters[i].get(); -} - -long sum = 0; -for (long counter : countersSnapshot) { -sum += counter; -} - -synchronized (printLock) { -System.out.println("== status of TimeHistogram " + id + " ="); - -for (int i = 0; i < countersSnapshot.length; ++i) { -System.out.println(String.format("bucket: %d , count: %d ,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 0 ? 1 : sum))); -} - -} -} - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java new file mode 100644 index 000..7a42598 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka.util; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + */ +publi