adding review changes according to Colin
Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/0cd394f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/0cd394f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/0cd394f5 Branch: refs/heads/master Commit: 0cd394f5788b5738dde7169f62e94ab2c8e2b31e Parents: 8ccbb89 Author: Nisala Nirmana <nisal...@gmail.com> Authored: Sat Aug 20 00:04:01 2016 +0530 Committer: Nisala Nirmana <nisal...@gmail.com> Committed: Sat Aug 20 00:04:01 2016 +0530 ---------------------------------------------------------------------- .../htrace/impl/KuduClientConfiguration.java | 6 +- .../htrace/impl/KuduReceiverConstants.java | 22 +- .../apache/htrace/impl/KuduSpanReceiver.java | 267 +++++-------------- .../htrace/impl/TestKuduSpanReceiver.java | 28 +- .../apache/htrace/impl/TestKuduSpanViewer.java | 237 ++++++++++++++++ 5 files changed, 333 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0cd394f5/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java ---------------------------------------------------------------------- diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java index 25d6fc4..c13e7b4 100644 --- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java +++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java @@ -23,7 +23,7 @@ import org.kududb.client.KuduClient.KuduClientBuilder; public class KuduClientConfiguration { private final String host; - private final String port; + private final Integer port; private final Integer workerCount; private final Integer bossCount; private final Boolean isStatisticsEnabled; @@ -32,7 +32,7 @@ public class KuduClientConfiguration { private final Long socketReadTimeout; public KuduClientConfiguration(String host, - String port, + Integer port, Integer workerCount, Integer bossCount, Boolean isStatisticsEnabled, @@ -52,7 +52,7 @@ public class KuduClientConfiguration { public KuduClient buildClient() { KuduClientBuilder builder = new KuduClient - .KuduClientBuilder(host.concat(":").concat(port)); + .KuduClientBuilder(host.concat(":").concat(port.toString())); if (workerCount != null) { builder.workerCount(workerCount); } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0cd394f5/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java ---------------------------------------------------------------------- diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java index be98311..805ec80 100644 --- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java +++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java @@ -23,39 +23,21 @@ public class KuduReceiverConstants { static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1"; static final String KUDU_MASTER_PORT_KEY = "kudu.master.port"; static final String DEFAULT_KUDU_MASTER_PORT = "7051"; - static final String SPAN_BLOCKING_QUEUE_SIZE_KEY = "kudu.span.queue.size"; - static final int DEFAULT_SPAN_BLOCKING_QUEUE_SIZE = 1000; static final String KUDU_SPAN_TABLE_KEY = "kudu.span.table"; static final String DEFAULT_KUDU_SPAN_TABLE = "span"; static final String KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY = "kudu.span.timeline.annotation.table"; static final String DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE = "span.timeline"; - static final String MAX_SPAN_BATCH_SIZE_KEY = "kudu.batch.size"; - static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100; - static final String NUM_PARALLEL_THREADS_KEY = "kudu.num.threads"; - static final int DEFAULT_NUM_PARALLEL_THREADS = 1; - static final String KUDU_COLUMN_SPAN_TRACE_ID_KEY = "kudu.column.span.traceid"; static final String DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID = "trace_id"; - static final String KUDU_COLUMN_SPAN_START_TIME_KEY = "kudu.column.span.starttime"; static final String DEFAULT_KUDU_COLUMN_SPAN_START_TIME = "start_time"; - static final String KUDU_COLUMN_SPAN_STOP_TIME_KEY = "kudu.column.span.stoptime"; static final String DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME = "stop_time"; - static final String KUDU_COLUMN_SPAN_SPAN_ID_KEY = "kudu.column.span.spanid"; static final String DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID = "span_id"; - static final String KUDU_COLUMN_SPAN_PROCESS_ID_KEY = "kudu.column.span.processid"; - static final String DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID = "process_id"; - static final String KUDU_COLUMN_SPAN_PARENT_ID_KEY = "kudu.column.span.parentid"; - static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID = "parent_id"; - static final String KUDU_COLUMN_SPAN_DESCRIPTION_KEY = "kudu.column.span.description"; + static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW = "parent_id_low"; + static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH = "parent_id_high"; static final String DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION = "description"; - static final String KUDU_COLUMN_SPAN_PARENT_KEY = "kudu.column.span.parent"; static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT = "parent"; - static final String KUDU_COLUMN_TIMELINE_TIME_KEY = "kudu.column.timeline.time"; static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIME = "time"; - static final String KUDU_COLUMN_TIMELINE_MESSAGE_KEY = "kudu.column.timeline.message"; static final String DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE = "message"; - static final String KUDU_COLUMN_TIMELINE_SPANID_KEY = "kudu.column.timeline.spanid"; static final String DEFAULT_KUDU_COLUMN_TIMELINE_SPANID = "spanid"; - static final String KUDU_COLUMN_TIMELINE_TIMELINEID_KEY = "kudu.column.timeline.timelineid"; static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID = "timelineid"; static final String KUDU_CLIENT_WORKER_COUNT_KEY = "kudu.client.worker.count"; static final String KUDU_CLIENT_BOSS_COUNT_KEY = "kudu.client.boss.count"; http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0cd394f5/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java index 46c324a..745f24d 100644 --- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java +++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java @@ -30,49 +30,22 @@ import org.kududb.client.Insert; import org.kududb.client.PartialRow; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.Executors; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; public class KuduSpanReceiver extends SpanReceiver { private static final Log LOG = LogFactory.getLog(KuduSpanReceiver.class); - private static final int SHUTDOWN_TIMEOUT = 30; - private static final int MAX_ERRORS = 10; - private final BlockingQueue<Span> queue; - private final AtomicBoolean running = new AtomicBoolean(true); private final KuduClientConfiguration clientConf; - private final int maxSpanBatchSize; - private final ThreadFactory threadFactory = new ThreadFactory() { - private final AtomicLong receiverIndex = new AtomicLong(0); - - @Override - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable); - thread.setDaemon(true); - thread.setName(String.format("kuduSpanReceiver-%d", - receiverIndex.getAndIncrement())); - return thread; - } - }; - - private ExecutorService service; + private KuduSession session; + private KuduClient client; private String table_span; private String column_span_trace_id; private String column_span_start_time; private String column_span_stop_time; private String column_span_span_id; - private String column_span_process_id; - private String column_span_parent_id; + private String column_span_parent_id_low; + private String column_span_parent_id_high; private String column_span_description; private String column_span_parent; @@ -85,7 +58,7 @@ public class KuduSpanReceiver extends SpanReceiver { public KuduSpanReceiver(HTraceConfiguration conf) { String masterHost; - String masterPort; + Integer masterPort; Integer workerCount; Integer bossCount; Boolean isStatisticsEnabled; @@ -95,8 +68,8 @@ public class KuduSpanReceiver extends SpanReceiver { masterHost = conf.get(KuduReceiverConstants.KUDU_MASTER_HOST_KEY, KuduReceiverConstants.DEFAULT_KUDU_MASTER_HOST); - masterPort = conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY, - KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT); + masterPort = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY, + KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT)); if (conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY) != null) { bossCount = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY)); @@ -137,186 +110,86 @@ public class KuduSpanReceiver extends SpanReceiver { adminOperationTimeout, operationTimeout, socketReadTimeout); - - this.queue = new ArrayBlockingQueue<Span>(conf.getInt(KuduReceiverConstants.SPAN_BLOCKING_QUEUE_SIZE_KEY, - KuduReceiverConstants.DEFAULT_SPAN_BLOCKING_QUEUE_SIZE)); - + //table names made configurable this.table_span = conf.get(KuduReceiverConstants.KUDU_SPAN_TABLE_KEY, KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE); - this.table_timeline= conf.get(KuduReceiverConstants.KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY, + this.table_timeline = conf.get(KuduReceiverConstants.KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY, KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE); - - this.column_span_trace_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_TRACE_ID_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID); - this.column_span_start_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_START_TIME_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME); - this.column_span_stop_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_STOP_TIME_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME); - this.column_span_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_SPAN_ID_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID); - this.column_span_process_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PROCESS_ID_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID); - this.column_span_parent_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_ID_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID); - this.column_span_description = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_DESCRIPTION_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION); - this.column_span_parent = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT); - this.column_timeline_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIME_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME); - this.column_timeline_message = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_MESSAGE_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE); - this.column_timeline_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_SPANID_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID); - this.column_timeline_timeline_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIMELINEID_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID); - - this.maxSpanBatchSize = conf.getInt(KuduReceiverConstants.MAX_SPAN_BATCH_SIZE_KEY, - KuduReceiverConstants.DEFAULT_MAX_SPAN_BATCH_SIZE); - if (this.service != null) { - this.service.shutdownNow(); - this.service = null; - } - int numThreads = conf.getInt(KuduReceiverConstants.NUM_PARALLEL_THREADS_KEY, - KuduReceiverConstants.DEFAULT_NUM_PARALLEL_THREADS); - this.service = Executors.newFixedThreadPool(numThreads, threadFactory); - for (int i = 0; i < numThreads; i++) { - this.service.submit(new KuduSpanReceiver.WriteSpanRunnable()); + //default column names have used + this.column_span_trace_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID; + this.column_span_start_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME; + this.column_span_stop_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME; + this.column_span_span_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID; + this.column_span_parent_id_low = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW; + this.column_span_parent_id_high = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH; + this.column_span_description = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION; + this.column_span_parent = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT; + this.column_timeline_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME; + this.column_timeline_message = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE; + this.column_timeline_span_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID; + this.column_timeline_timeline_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID; + //kudu backend session initialization + if (this.session == null) { + if (this.client == null) { + client = clientConf.buildClient(); + } + session = client.newSession(); } } @Override public void close() throws IOException { - running.set(false); - service.shutdown(); try { - if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) { - LOG.error("Timeout " + SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS + - " reached while shutting worker threads which process enqued spans." + - " Enqueued spans which are left in blocking queue is dropped."); + if (this.session != null) { + if (this.session.isClosed()) { + this.session.close(); + } + this.client.close(); } - } catch (InterruptedException e) { - LOG.warn("Interrupted exception occured while terminating thread service executor.", e); + } catch (java.lang.Exception e) { + LOG.warn("Failed to close Kudu session. " + e.getMessage()); } } @Override public void receiveSpan(Span span) { - if (running.get()) { - try { - this.queue.add(span); - } catch (IllegalStateException e) { - LOG.error("Error trying to enqueue span (" - + span.getDescription() - + ") to the queue. Blocking Queue is currently reached its capacity."); + try { + KuduTable tableSpan = client.openTable(table_span); + Insert spanInsert = tableSpan.newInsert(); + PartialRow spanRow = spanInsert.getRow(); + spanRow.addLong(column_span_trace_id, span.getSpanId().getLow()); + spanRow.addLong(column_span_start_time, span.getStartTimeMillis()); + spanRow.addLong(column_span_stop_time, span.getStopTimeMillis()); + spanRow.addLong(column_span_span_id, span.getSpanId().getHigh()); + if (span.getParents().length == 0) { + spanRow.addLong(column_span_parent_id_low, 0); + spanRow.addLong(column_span_parent_id_high, 0); + spanRow.addBoolean(column_span_parent, true); + } else if (span.getParents().length > 0) { + spanRow.addLong(column_span_parent_id_low, span.getParents()[0].getLow()); + spanRow.addLong(column_span_parent_id_high, span.getParents()[0].getHigh()); + spanRow.addBoolean(column_span_parent, false); } - } - } - - private class WriteSpanRunnable implements Runnable { - - private KuduSession session; - private KuduClient client; - - @Override - public void run() { - List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize); - long errorCount = 0; - while (running.get() || queue.size() > 0) { - Span firstSpan = null; - try { - firstSpan = queue.poll(1, TimeUnit.SECONDS); - if (firstSpan != null) { - dequeuedSpans.add(firstSpan); - queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1); - } - } catch (InterruptedException ie) { - LOG.error("Interrupted Exception occurred while polling to " + - "retrieve first span from blocking queue"); - } - startSession(); - if (dequeuedSpans.isEmpty()) { - try { - this.session.flush(); - } catch (java.lang.Exception e) { - LOG.error("Failed to flush writes to Kudu."); - closeSession(); - } - continue; - } - try { - for (Span span : dequeuedSpans) { - KuduTable tableSpan = client.openTable(table_span); - Insert spanInsert = tableSpan.newInsert(); - PartialRow spanRow = spanInsert.getRow(); - spanRow.addLong(column_span_trace_id,span.getSpanId().getHigh()); - spanRow.addLong(column_span_start_time,span.getStartTimeMillis()); - spanRow.addLong(column_span_stop_time,span.getStopTimeMillis()); - spanRow.addLong(column_span_span_id,span.getSpanId().getLow()); - spanRow.addString(column_span_process_id,span.getTracerId()); - if (span.getParents().length == 0) { - spanRow.addLong(column_span_parent_id,0); - spanRow.addBoolean(column_span_parent,false); - } else if (span.getParents().length > 0) { - spanRow.addLong(column_span_parent_id,span.getParents()[0].getLow()); - spanRow.addBoolean(column_span_parent,true); - } - spanRow.addString(column_span_description,span.getDescription()); - this.session.apply(spanInsert); - long annotationCounter = 0; - for (TimelineAnnotation ta : span.getTimelineAnnotations()) { - annotationCounter++; - KuduTable tableTimeline = client.openTable(table_timeline); - Insert timelineInsert = tableTimeline.newInsert(); - PartialRow timelineRow = timelineInsert.getRow(); - timelineRow.addLong(column_timeline_timeline_id,span.getSpanId().getHigh()+annotationCounter); - timelineRow.addLong(column_timeline_time,ta.getTime()); - timelineRow.addString(column_timeline_message,ta.getMessage()); - timelineRow.addLong(column_timeline_span_id,span.getSpanId().getHigh()); - this.session.apply(timelineInsert); - } - } - dequeuedSpans.clear(); - errorCount = 0; - } catch (Exception e) { - errorCount += 1; - if (errorCount < MAX_ERRORS) { - try { - queue.addAll(dequeuedSpans); - } catch (IllegalStateException ex) { - LOG.error("Exception occured while writing spans kudu datastore. " + - "Trying to re-enqueue de-queued spans to blocking queue for writing but failed. " + - "Dropped " + dequeuedSpans.size() + " dequeued span(s) which were due written" + - "into kudu datastore"); - } - } - closeSession(); - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - LOG.error("Interrupted Exception occurred while allowing kudu to re-stabilized"); - } - } + spanRow.addString(column_span_description, span.getDescription()); + session.apply(spanInsert); + long annotationCounter = 0; + for (TimelineAnnotation ta : span.getTimelineAnnotations()) { + annotationCounter++; + KuduTable tableTimeline = client.openTable(table_timeline); + Insert timelineInsert = tableTimeline.newInsert(); + PartialRow timelineRow = timelineInsert.getRow(); + timelineRow.addLong(column_timeline_timeline_id, span.getSpanId().getLow() + annotationCounter); + timelineRow.addLong(column_timeline_time, ta.getTime()); + timelineRow.addString(column_timeline_message, ta.getMessage()); + timelineRow.addLong(column_timeline_span_id, span.getSpanId().getLow()); + session.apply(timelineInsert); } - closeSession(); - } - - private void closeSession() { + } catch (java.lang.Exception ex) { + LOG.error("Failed to write span to Kudu backend", ex); + } finally { try { - if (this.session != null) { - this.session.close(); - this.session = null; - } - } catch (java.lang.Exception e) { - LOG.warn("Failed to close Kudu session. " + e.getMessage()); - } - } - - private void startSession() { - if (this.session == null) { - if (this.client == null) { - client = clientConf.buildClient(); - } - session = client.newSession(); + session.flush(); + } catch (java.lang.Exception ex) { + //Ignore } } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0cd394f5/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java index 8b446e2..c13970d 100644 --- a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java +++ b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java @@ -48,7 +48,7 @@ import java.util.List; public class TestKuduSpanReceiver extends BaseKuduTest { private static final String BIN_DIR_PROP = "binDir"; - private static final String BIN_DIR_PROP_DEFAULT = "./build/release/bin"; + private static final String BIN_DIR_PROP_DEFAULT = "../build/release/bin"; //set kudu binary location and enable test execution from here private static final boolean TEST_ENABLE = false; @@ -83,10 +83,10 @@ public class TestKuduSpanReceiver extends BaseKuduTest { span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID, Type.INT64) .build()); - span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID, - Type.STRING) + span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH, + Type.INT64) .build()); - span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID, + span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW, Type.INT64) .build()); span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT, @@ -137,6 +137,7 @@ public class TestKuduSpanReceiver extends BaseKuduTest { KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1])) .build(); TraceScope scope = tracer.newScope("testKuduScope"); + scope.addTimelineAnnotation("test"); Span testSpan = scope.getSpan(); scope.close(); tracer.close(); @@ -147,6 +148,8 @@ public class TestKuduSpanReceiver extends BaseKuduTest { spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION); spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME); spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME); + spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH); + spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW); KuduScanner scanner = client.newScannerBuilder(client.openTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE)) .setProjectedColumnNames(spanColumns) .build(); @@ -157,11 +160,18 @@ public class TestKuduSpanReceiver extends BaseKuduTest { RowResult result = results.next(); long traceId = result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID); MilliSpan.Builder builder = new MilliSpan.Builder() - .spanId(new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID), - result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID))) + .spanId(new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID), + result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID))) .description(result.getString(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION)) .begin(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME)) .end(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME)); + if (!(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH) == 0 && + result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW) == 0)) { + SpanId[] parents = new SpanId[1]; + parents[0] = new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH), + result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW)); + builder.parents(parents); + } List<String> timelineColumns = new ArrayList<>(); timelineColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME); timelineColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE); @@ -177,7 +187,7 @@ public class TestKuduSpanReceiver extends BaseKuduTest { while (timelineScanner.hasMoreRows()) { RowResultIterator timelineResults = timelineScanner.nextRows(); while (timelineResults.hasNext()) { - RowResult timelineRow = results.next(); + RowResult timelineRow = timelineResults.next(); timelineList.add(new TimelineAnnotation (timelineRow.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME), timelineRow.getString(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE))); @@ -193,6 +203,10 @@ public class TestKuduSpanReceiver extends BaseKuduTest { Assert.assertEquals(testSpan.getStartTimeMillis(), dbSpan.getStartTimeMillis()); Assert.assertEquals(testSpan.getStopTimeMillis(), dbSpan.getStopTimeMillis()); Assert.assertEquals(testSpan.getDescription(), dbSpan.getDescription()); + Assert.assertEquals(testSpan.getTimelineAnnotations().get(0).getMessage(), + dbSpan.getTimelineAnnotations().get(0).getMessage()); + Assert.assertEquals(testSpan.getTimelineAnnotations().get(0).getTime(), + dbSpan.getTimelineAnnotations().get(0).getTime()); syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE); syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE); } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0cd394f5/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java ---------------------------------------------------------------------- diff --git a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java new file mode 100644 index 0000000..7dd2807 --- /dev/null +++ b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.impl; + +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.Tracer; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.MilliSpan; +import org.apache.htrace.core.TracerPool; +import org.apache.htrace.core.TimelineAnnotation; +import org.apache.htrace.viewer.KuduSpanViewer; +import org.junit.*; +import org.kududb.ColumnSchema; +import org.kududb.Schema; +import org.kududb.Type; +import org.kududb.client.BaseKuduTest; +import org.kududb.client.KuduClient; +import org.kududb.client.CreateTableOptions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +public class TestKuduSpanViewer extends BaseKuduTest { + + private static final String BIN_DIR_PROP = "binDir"; + private static final String BIN_DIR_PROP_DEFAULT = "../build/release/bin"; + //set kudu binary location and enable test execution from here + private static final boolean TEST_ENABLE = false; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + if (TEST_ENABLE) { + System.setProperty(BIN_DIR_PROP, BIN_DIR_PROP_DEFAULT); + BaseKuduTest.setUpBeforeClass(); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if(TEST_ENABLE) { + BaseKuduTest.tearDownAfterClass(); + } + } + + private void createTable() throws Exception { + KuduClient client = BaseKuduTest.syncClient; + List<ColumnSchema> span_columns = new ArrayList(); + span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID, + Type.INT64) + .key(true) + .build()); + span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME, + Type.INT64) + .build()); + span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME, + Type.INT64) + .build()); + span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID, + Type.INT64) + .build()); + span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW, + Type.INT64) + .build()); + span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH, + Type.INT64) + .build()); + span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT, + Type.BOOL) + .build()); + span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION, + Type.STRING) + .build()); + + List<String> rangeKeys = new ArrayList<>(); + rangeKeys.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID); + Schema schema = new Schema(span_columns); + client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE, schema, + new CreateTableOptions().setRangePartitionColumns(rangeKeys)); + + List<ColumnSchema> timeline_columns = new ArrayList(); + timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder + (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID, Type.INT64) + .key(true) + .build()); + timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME, + Type.INT64) + .build()); + timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder + (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE, Type.STRING) + .build()); + timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID, + Type.INT64) + .build()); + List<String> rangeKeysTimeline = new ArrayList<>(); + rangeKeysTimeline.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID); + Schema timelineSchema = new Schema(timeline_columns); + client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE, timelineSchema, + new CreateTableOptions().setRangePartitionColumns(rangeKeysTimeline)); + } + + + @Test + public void testSpanToJson() { + SpanId[] parent = new SpanId[1]; + parent[0] = new SpanId(1,1); + MilliSpan.Builder builder = new MilliSpan.Builder() + .parents(parent) + .begin(1) + .end(2) + .spanId(new SpanId(10,20)) + .description("description"); + List<TimelineAnnotation> timelineList = new LinkedList<TimelineAnnotation>(); + for (int i = 0; i < 3; i++) { + timelineList.add(new TimelineAnnotation(i,"message" + i)); + } + builder.timeline(timelineList); + Span span = builder.build(); + try { + String json = KuduSpanViewer.toJsonString(span); + String expected = + "{\"trace_id\":\"20\",\"span_id\":\"10\",\"description\":\"description\",\"parent_id\":\"1\"," + + "\"start\":\"1\",\"stop\":\"2\",\"timeline\":[{\"time\":\"0\",\"message\":\"message0\",}{\"time\":\"1\"," + + "\"message\":\"message1\",}{\"time\":\"2\",\"message\":\"message2\",}]}"; + Assert.assertEquals(json, expected); + } catch (IOException e) { + Assert.fail("failed to get json from span. " + e.getMessage()); + } + } + + @Test + public void testSpanWithoutTimelineToJson() { + SpanId[] parent = new SpanId[1]; + parent[0] = new SpanId(200,111); + MilliSpan.Builder builder = new MilliSpan.Builder() + .parents(parent) + .begin(1) + .end(2) + .spanId(new SpanId(10,20)) + .tracerId("pid") + .description("description"); + Span span = builder.build(); + try { + String json = KuduSpanViewer.toJsonString(span); + String expected = + "{\"trace_id\":\"20\",\"span_id\":\"10\",\"description\":\"description\"," + + "\"parent_id\":\"111\",\"start\":\"1\",\"stop\":\"2\",}"; + Assert.assertEquals(json, expected); + } catch (IOException e) { + Assert.fail("failed to get json from span. " + e.getMessage()); + } + } + + @Ignore + @Test + public void TestKuduSpanViewer() throws Exception { + createTable(); + Tracer tracer = new Tracer.Builder(). + name("testKuduSpanReceiver"). + tracerPool(new TracerPool("testKuduSpanReceiver")). + conf(HTraceConfiguration.fromKeyValuePairs( + "sampler.classes", "AlwaysSampler", + "span.receiver.classes", "org.apache.htrace.impl.KuduSpanReceiver", + KuduReceiverConstants.KUDU_MASTER_HOST_KEY, BaseKuduTest.getMasterAddresses().split(":")[0], + KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1])) + .build(); + TraceScope scope = tracer.newScope("testKuduScope"); + scope.addTimelineAnnotation("test"); + Span testSpan = scope.getSpan(); + TraceScope childScope = tracer.newScope("testKuduChildScope", new SpanId(100,200)); + Span childScopeSpan = childScope.getSpan(); + childScope.addTimelineAnnotation("testChild"); + childScope.close(); + scope.close(); + tracer.close(); + HTraceConfiguration conf = HTraceConfiguration + .fromKeyValuePairs(KuduReceiverConstants.KUDU_MASTER_HOST_KEY, + BaseKuduTest.getMasterAddresses().split(":")[0], + KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]); + KuduSpanViewer viewer = new KuduSpanViewer(conf); + List<Span> list = viewer.getRootSpans(); + Assert.assertEquals(list.size(), 1); + Span span = viewer.getRootSpans().get(0); + try { + String json = KuduSpanViewer.toJsonString(span); + String expected = KuduSpanViewer.toJsonString(testSpan); + Assert.assertEquals(json, expected); + } catch (IOException e) { + Assert.fail("failed to get json from span. " + e.getMessage()); + } + List<Span> list2 = viewer.getSpans(span.getSpanId().getHigh()); + Assert.assertEquals(list2.size(), 2); + Span span2 = list2.get(0); + try { + String json = KuduSpanViewer.toJsonString(span2); + String expected = null; + if(span2.getParents().length != 0) { + expected = KuduSpanViewer.toJsonString(childScopeSpan); + } else { + expected = KuduSpanViewer.toJsonString(testSpan); + } + Assert.assertEquals(json, expected); + } catch (IOException e) { + Assert.fail("failed to get json from span. " + e.getMessage()); + } + Span span3 = list2.get(1); + try { + String json = KuduSpanViewer.toJsonString(span3); + String expected = null; + if(span3.getParents().length != 0) { + expected = KuduSpanViewer.toJsonString(childScopeSpan); + } else { + expected = KuduSpanViewer.toJsonString(testSpan); + } + Assert.assertEquals(json, expected); + } catch (IOException e) { + Assert.fail("failed to get json from span. " + e.getMessage()); + } + } +}