http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java deleted file mode 100644 index 1c579c6..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.diagnose; - -import java.util.concurrent.atomic.AtomicLong; - -public class TimeHistogram { - private long[] bucketsBoundary; - private AtomicLong[] counters; - private String id; - - private static Object printLock = new Object(); - - /** - * example: [10,20] will generate three buckets: (-â,10), [10,20),[20,+â) - * unit: second - */ - public TimeHistogram(long[] bucketsBoundary, String id) { - this.bucketsBoundary = bucketsBoundary; - this.counters = new AtomicLong[this.bucketsBoundary.length + 1]; - for (int i = 0; i < counters.length; i++) { - this.counters[i] = new AtomicLong(); - } - this.id = id; - } - - /** - * @param second in seconds - */ - public void process(long second) { - for (int i = 0; i < bucketsBoundary.length; ++i) { - if (second < bucketsBoundary[i]) { - counters[i].incrementAndGet(); - return; - } - } - - counters[bucketsBoundary.length].incrementAndGet(); - } - - /** - * @param millis in milli seconds - */ - public void processMillis(long millis) { - process(millis / 1000); - } - - public void printStatus() { - long[] countersSnapshot = new long[counters.length]; - for (int i = 0; i < countersSnapshot.length; i++) { - countersSnapshot[i] = counters[i].get(); - } - - long sum = 0; - for (long counter : countersSnapshot) { - sum += counter; - } - - synchronized (printLock) { - System.out.println("============== status of TimeHistogram " + id + " ================="); - - for (int i = 0; i < countersSnapshot.length; ++i) { - System.out.println(String.format("bucket: %d , count: %d ,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 0 ? 1 : sum))); - } - - } - } - -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java new file mode 100644 index 0000000..7a42598 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka.util; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + */ +public class ByteBufferBackedInputStream extends InputStream { + + private ByteBuffer buf; + + public ByteBufferBackedInputStream(ByteBuffer buf) { + this.buf = buf; + } + + @Override + public int read() throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + return buf.get() & 0xFF; + } + + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + + len = Math.min(len, buf.remaining()); + buf.get(bytes, off, len); + return len; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java deleted file mode 100644 index bce9bb9..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.kafka.util; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -import javax.annotation.Nullable; - -import kafka.cluster.BrokerEndPoint; -import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kylin.source.kafka.TopicMeta; -import org.apache.kylin.source.kafka.config.KafkaClusterConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.cluster.Broker; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.TopicMetadata; -import kafka.javaapi.TopicMetadataRequest; -import kafka.javaapi.TopicMetadataResponse; -import kafka.javaapi.consumer.SimpleConsumer; - -/** - */ -public final class KafkaRequester { - - private static final Logger logger = LoggerFactory.getLogger(KafkaRequester.class); - - private static ConcurrentMap<String, SimpleConsumer> consumerCache = Maps.newConcurrentMap(); - - static { - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - KafkaRequester.shutdown(); - } - })); - } - - private static SimpleConsumer getSimpleConsumer(Broker broker, int timeout, int bufferSize, String clientId) { - String key = createKey(broker, timeout, bufferSize, clientId); - if (consumerCache.containsKey(key)) { - return consumerCache.get(key); - } else { - BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT); - consumerCache.putIfAbsent(key, new SimpleConsumer(brokerEndPoint.host(), brokerEndPoint.port(), timeout, bufferSize, clientId)); - return consumerCache.get(key); - } - } - - private static String createKey(Broker broker, int timeout, int bufferSize, String clientId) { - return broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).connectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId; - } - - public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) { - SimpleConsumer consumer; - for (Broker broker : kafkaClusterConfig.getBrokers()) { - consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup"); - List<String> topics = Collections.singletonList(kafkaClusterConfig.getTopic()); - TopicMetadataRequest req = new TopicMetadataRequest(topics); - TopicMetadataResponse resp; - try { - resp = consumer.send(req); - } catch (Exception e) { - logger.warn("cannot send TopicMetadataRequest successfully: " + e); - continue; - } - final List<TopicMetadata> topicMetadatas = resp.topicsMetadata(); - if (topicMetadatas.size() != 1) { - break; - } - final TopicMetadata topicMetadata = topicMetadatas.get(0); - if (topicMetadata.errorCode() != 0) { - break; - } - List<Integer> partitionIds = Lists.transform(topicMetadata.partitionsMetadata(), new Function<PartitionMetadata, Integer>() { - @Nullable - @Override - public Integer apply(PartitionMetadata partitionMetadata) { - return partitionMetadata.partitionId(); - } - }); - return new TopicMeta(kafkaClusterConfig.getTopic(), partitionIds); - } - logger.debug("cannot find topic:" + kafkaClusterConfig.getTopic()); - return null; - } - - public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaClusterConfig kafkaClusterConfig) { - logger.debug("Brokers: " + brokers.toString()); - SimpleConsumer consumer; - for (Broker broker : brokers) { - consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup"); - List<String> topics = Collections.singletonList(topic); - TopicMetadataRequest req = new TopicMetadataRequest(topics); - TopicMetadataResponse resp; - try { - resp = consumer.send(req); - } catch (Exception e) { - logger.warn("cannot send TopicMetadataRequest successfully: " + e); - continue; - } - final List<TopicMetadata> topicMetadatas = resp.topicsMetadata(); - if (topicMetadatas.size() != 1) { - logger.warn("invalid topicMetadata size:" + topicMetadatas.size()); - break; - } - final TopicMetadata topicMetadata = topicMetadatas.get(0); - if (topicMetadata.errorCode() != 0) { - logger.warn("fetching topicMetadata with errorCode:" + topicMetadata.errorCode()); - break; - } - for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) { - StringBuffer logText = new StringBuffer(); - logText.append("PartitionMetadata debug errorCode: " + partitionMetadata.errorCode()); - logText.append("PartitionMetadata debug partitionId: " + partitionMetadata.partitionId()); - logText.append("PartitionMetadata debug leader: " + partitionMetadata.leader()); - logText.append("PartitionMetadata debug ISR: " + partitionMetadata.isr()); - logText.append("PartitionMetadata debug replica: " + partitionMetadata.replicas()); - logger.info(logText.toString()); - if (partitionMetadata.partitionId() == partitionId) { - return partitionMetadata; - } - } - } - logger.debug("cannot find PartitionMetadata, topic:" + topic + " partitionId:" + partitionId); - return null; - } - - public static FetchResponse fetchResponse(String topic, int partitionId, long offset, Broker broker, KafkaClusterConfig kafkaClusterConfig) { - final String clientName = "client_" + topic + "_" + partitionId; - SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName); - kafka.api.FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partitionId, offset, 1048576) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka, 1048576 is the default value on shell - .build(); - return consumer.fetch(req); - } - - public static long getLastOffset(String topic, int partitionId, long whichTime, Broker broker, KafkaClusterConfig kafkaClusterConfig) { - String clientName = "client_" + topic + "_" + partitionId; - SimpleConsumer consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), clientName); - TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId); - Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); - OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); - OffsetResponse response = consumer.getOffsetsBefore(request); - - if (response.hasError()) { - logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId)); - return 0; - } - long[] offsets = response.offsets(topic, partitionId); - return offsets[0]; - } - - public static void shutdown() { - for (SimpleConsumer simpleConsumer : consumerCache.values()) { - simpleConsumer.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java deleted file mode 100644 index ee5bb20..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.util; - -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.Map; - -import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.StreamingMessage; -import org.apache.kylin.source.kafka.StreamingParser; -import org.apache.kylin.source.kafka.config.KafkaClusterConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -import kafka.api.OffsetRequest; -import kafka.cluster.Broker; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.message.MessageAndOffset; - -/** - */ -public final class KafkaUtils { - - private static final Logger logger = LoggerFactory.getLogger(KafkaUtils.class); - - private static final int MAX_RETRY_TIMES = 6; - - private KafkaUtils() { - } - - public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) { - final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig); - if (partitionMetadata != null) { - if (partitionMetadata.errorCode() != 0) { - logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode()); - } - return new Broker(partitionMetadata.leader(), SecurityProtocol.PLAINTEXT); - } else { - return null; - } - } - - private static void sleep(int retryTimes) { - int seconds = (int) Math.pow(2, retryTimes); - logger.info("retry times:" + retryTimes + " sleep:" + seconds + " seconds"); - try { - Thread.sleep(seconds * 1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - private static MessageAndOffset getKafkaMessage(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset) { - final String topic = kafkaClusterConfig.getTopic(); - int retry = 0; - while (retry < MAX_RETRY_TIMES) {//max sleep time 63 seconds - final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId); - if (leadBroker == null) { - logger.warn("unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId); - sleep(retry++); - continue; - } - final FetchResponse response = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig); - if (response.errorCode(topic, partitionId) != 0) { - logger.warn("errorCode of FetchResponse is:" + response.errorCode(topic, partitionId)); - sleep(retry++); - continue; - } - final Iterator<MessageAndOffset> iterator = response.messageSet(topic, partitionId).iterator(); - if (!iterator.hasNext()) { - logger.warn("messageSet is empty"); - sleep(retry++); - continue; - } - return iterator.next(); - } - throw new IllegalStateException(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d, failed to get StreamMessage from kafka", topic, partitionId, offset)); - } - - public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamingParser streamingParser) { - Pair<Long, Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId); - final String topic = kafkaClusterConfig.getTopic(); - - logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, firstAndLast.getFirst(), firstAndLast.getSecond())); - final long result = binarySearch(kafkaClusterConfig, partitionId, firstAndLast.getFirst(), firstAndLast.getSecond(), timestamp, streamingParser); - logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result)); - return result; - } - - public static Pair<Long, Long> getFirstAndLastOffset(KafkaClusterConfig kafkaClusterConfig, int partitionId) { - final String topic = kafkaClusterConfig.getTopic(); - final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId); - final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig); - final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig) - 1; - return Pair.newPair(earliestOffset, latestOffset); - } - - private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamingParser streamingParser) { - Map<Long, Long> cache = Maps.newHashMap(); - - while (startOffset < endOffset) { - long midOffset = startOffset + ((endOffset - startOffset) >> 1); - long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamingParser, cache); - long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamingParser, cache); - long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamingParser, cache); - // hard to ensure these 2 conditions - // Preconditions.checkArgument(startTimestamp <= midTimestamp); - // Preconditions.checkArgument(midTimestamp <= endTimestamp); - if (startTimestamp >= targetTimestamp) { - return startOffset; - } - if (endTimestamp <= targetTimestamp) { - return endOffset; - } - if (targetTimestamp == midTimestamp) { - return midOffset; - } else if (targetTimestamp < midTimestamp) { - endOffset = midOffset - 1; - continue; - } else { - startOffset = midOffset + 1; - continue; - } - } - return startOffset; - } - - private static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamingParser streamingParser, Map<Long, Long> cache) { - if (cache.containsKey(offset)) { - return cache.get(offset); - } else { - long t = getDataTimestamp(kafkaClusterConfig, partitionId, offset, streamingParser); - cache.put(offset, t); - return t; - } - } - - public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamingParser streamingParser) { - final String topic = kafkaClusterConfig.getTopic(); - final MessageAndOffset messageAndOffset = getKafkaMessage(kafkaClusterConfig, partitionId, offset); - final ByteBuffer payload = messageAndOffset.message().payload(); - byte[] bytes = new byte[payload.limit()]; - payload.get(bytes); - final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload()); - streamingMessage.setOffset(messageAndOffset.offset()); - logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp())); - return streamingMessage.getTimestamp(); - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/test/java/TimedJsonStreamParserTest.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/java/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/TimedJsonStreamParserTest.java index fb33059..5a52b61 100644 --- a/source-kafka/src/test/java/TimedJsonStreamParserTest.java +++ b/source-kafka/src/test/java/TimedJsonStreamParserTest.java @@ -20,7 +20,8 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.*; +import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import com.fasterxml.jackson.databind.type.MapType; @@ -32,7 +33,6 @@ import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.TimedJsonStreamParser; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/storage-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml index 1d33071..23e7239 100644 --- a/storage-hbase/pom.xml +++ b/storage-hbase/pom.xml @@ -41,10 +41,6 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-engine-mr</artifactId> </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-engine-streaming</artifactId> - </dependency> <!-- Env & Test --> <dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java deleted file mode 100644 index 9adaf24..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.storage.hbase.steps; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter; -import org.apache.kylin.cube.inmemcubing.ICuboidWriter; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.CuboidStatsUtil; -import org.apache.kylin.engine.streaming.IStreamingOutput; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.apache.kylin.metadata.model.IBuildable; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - */ -public class HBaseStreamingOutput implements IStreamingOutput { - - private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class); - - @Override - public ICuboidWriter getCuboidWriter(IBuildable buildable) { - try { - CubeSegment cubeSegment = (CubeSegment) buildable; - - final HTableInterface hTable; - hTable = createHTable(cubeSegment); - List<ICuboidWriter> cuboidWriters = Lists.newArrayList(); - cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable)); - cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment)); - return new CompoundCuboidWriter(cuboidWriters); - } catch (IOException e) { - throw new RuntimeException("failed to get ICuboidWriter", e); - } - } - - @Override - public void output(IBuildable buildable, Map<Long, HyperLogLogPlusCounter> samplingResult) { - try { - CubeSegment cubeSegment = (CubeSegment) buildable; - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final Configuration conf = HadoopUtil.getCurrentConfiguration(); - final Path outputPath = new Path("file://" + BatchConstants.CFG_STATISTICS_LOCAL_DIR + UUID.randomUUID().toString()); - CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, samplingResult, 100); - FSDataInputStream inputStream = null; - try { - inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME)); - ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), inputStream, System.currentTimeMillis()); - } finally { - IOUtils.closeQuietly(inputStream); - FileSystem.getLocal(conf).delete(outputPath, true); - } - } catch (IOException e) { - throw new RuntimeException("failed to write sampling result", e); - } - } - - private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException { - final String hTableName = cubeSegment.getStorageLocationIdentifier(); - CubeHTableUtil.createHTable(cubeSegment, null); - final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName); - logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!"); - return hTable; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java index f4fb308..9cb135a 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java @@ -35,8 +35,8 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.engine.streaming.StreamingConfig; -import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.metadata.streaming.StreamingConfig; +import org.apache.kylin.metadata.streaming.StreamingManager; import org.apache.kylin.job.dao.ExecutableDao; import org.apache.kylin.job.dao.ExecutablePO; import org.apache.kylin.job.exception.PersistentException;