http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java new file mode 100644 index 0000000..7675d74 --- /dev/null +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -0,0 +1,256 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.tracing; + +import static com.google.common.base.Preconditions.checkState; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; +import java.util.UUID; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.cql3.ColumnNameBuilder; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ExpiringColumn; +import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.db.marshal.InetAddressType; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.TimeUUIDType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.WrappedRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A trace session context. Able to track and store trace sessions. A session is usually a user initiated query, and may + * have multiple local and remote events before it is completed. All events and sessions are stored at table. + */ +public class Tracing +{ + public static final String TRACE_KS = "system_traces"; + public static final String EVENTS_CF = "events"; + public static final String SESSIONS_CF = "sessions"; + public static final String TRACE_HEADER = "TraceSession"; + + private static final int TTL = 24 * 3600; + + private static Tracing instance = new Tracing(); + + public static final Logger logger = LoggerFactory.getLogger(Tracing.class); + + /** + * Fetches and lazy initializes the trace context. + */ + public static Tracing instance() + { + return instance; + } + + private InetAddress localAddress = FBUtilities.getLocalAddress(); + + private final ThreadLocal<TraceState> state = new ThreadLocal<TraceState>(); + + public static void addColumn(ColumnFamily cf, ByteBuffer name, Object value) + { + cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value.toString()), System.currentTimeMillis(), TTL)); + } + + public static void addColumn(ColumnFamily cf, ByteBuffer name, InetAddress address) + { + cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(address), System.currentTimeMillis(), TTL)); + } + + public static void addColumn(ColumnFamily cf, ByteBuffer name, int value) + { + cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), System.currentTimeMillis(), TTL)); + } + + public static void addColumn(ColumnFamily cf, ByteBuffer name, long value) + { + cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), System.currentTimeMillis(), TTL)); + } + + public static void addColumn(ColumnFamily cf, ByteBuffer name, String value) + { + cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), System.currentTimeMillis(), TTL)); + } + + private void addColumn(ColumnFamily cf, ByteBuffer name, ByteBuffer value) + { + cf.addColumn(new ExpiringColumn(name, value, System.currentTimeMillis(), TTL)); + } + + public void addParameterColumns(ColumnFamily cf, Map<String, String> rawPayload) + { + for (Map.Entry<String, String> entry : rawPayload.entrySet()) + { + cf.addColumn(new ExpiringColumn(buildName(cf.metadata(), bytes("parameters"), bytes(entry.getKey())), + bytes(entry.getValue()), System.currentTimeMillis(), TTL)); + } + } + + public static ByteBuffer buildName(CFMetaData meta, ByteBuffer... args) + { + ColumnNameBuilder builder = meta.getCfDef().getColumnNameBuilder(); + for (ByteBuffer arg : args) + builder.add(arg); + return builder.build(); + } + + public UUID getSessionId() + { + assert isTracing(); + return state.get().sessionId; + } + + /** + * Indicates if the current thread's execution is being traced. + */ + public static boolean isTracing() + { + return instance != null && instance.state.get() != null; + } + + public void reset() + { + state.set(null); + } + + public UUID newSession() + { + return newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()))); + } + + public UUID newSession(UUID sessionId) + { + assert state.get() == null; + + TraceState ts = new TraceState(localAddress, sessionId); + state.set(ts); + + return sessionId; + } + + public void stopSession() + { + TraceState state = this.state.get(); + if (state == null) // inline isTracing to avoid implicit two calls to state.get() + { + logger.debug("request complete"); + } + else + { + final long finished_at = System.currentTimeMillis(); + final ByteBuffer sessionIdBytes = state.sessionIdBytes; + + StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() + { + public void runMayThrow() throws TimedOutException, UnavailableException + { + ColumnFamily cf = ColumnFamily.create(CFMetaData.TraceSessionsCf); + addColumn(cf, + buildName(CFMetaData.TraceSessionsCf, bytes("finished_at")), + LongType.instance.decompose(finished_at)); + RowMutation mutation = new RowMutation(TRACE_KS, sessionIdBytes); + mutation.add(cf); + StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY); + } + }); + + reset(); + } + } + + public TraceState get() + { + return state.get(); + } + + public void set(final TraceState tls) + { + state.set(tls); + } + + public void begin(final String request, final Map<String, String> parameters) + { + assert isTracing(); + + final long started_at = System.currentTimeMillis(); + final ByteBuffer sessionIdBytes = state.get().sessionIdBytes; + + StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() + { + public void runMayThrow() throws TimedOutException, UnavailableException + { + ColumnFamily cf = ColumnFamily.create(CFMetaData.TraceSessionsCf); + addColumn(cf, + buildName(CFMetaData.TraceSessionsCf, bytes("coordinator")), + InetAddressType.instance.decompose(FBUtilities.getBroadcastAddress())); + addColumn(cf, + buildName(CFMetaData.TraceSessionsCf, bytes("request")), + UTF8Type.instance.decompose(request)); + addColumn(cf, + buildName(CFMetaData.TraceSessionsCf, bytes("started_at")), + LongType.instance.decompose(started_at)); + addParameterColumns(cf, parameters); + RowMutation mutation = new RowMutation(TRACE_KS, sessionIdBytes); + mutation.add(cf); + StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY); + } + }); + } + + /** + * Updates the threads query context from a message + * + * @param message + * The internode message + */ + public void initializeFromMessage(final MessageIn<?> message) + { + final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER); + + // if the message has no session context header don't do tracing + if (sessionBytes == null) + { + state.set(null); + return; + } + + checkState(sessionBytes.length == 16); + state.set(new TraceState(message.from, UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)))); + } + +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/tracing/TracingAppender.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TracingAppender.java b/src/java/org/apache/cassandra/tracing/TracingAppender.java new file mode 100644 index 0000000..25b7056 --- /dev/null +++ b/src/java/org/apache/cassandra/tracing/TracingAppender.java @@ -0,0 +1,65 @@ +package org.apache.cassandra.tracing; + +import static org.apache.cassandra.tracing.Tracing.*; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.spi.LoggingEvent; + +public class TracingAppender extends AppenderSkeleton +{ + protected void append(final LoggingEvent event) + { + if (Tracing.instance() == null) // instance might not be built at the time this is called + return; + + final TraceState state = Tracing.instance().get(); + if (state == null) // inline isTracing to avoid implicit two calls to state.get() + return; + + final int elapsed = state.elapsed(); + StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() + { + public void runMayThrow() throws TimedOutException, UnavailableException + { + ByteBuffer eventId = ByteBufferUtil.bytes(UUIDGen.makeType1UUIDFromHost(FBUtilities + .getBroadcastAddress())); + CFMetaData cfMeta = CFMetaData.TraceEventsCf; + ColumnFamily cf = ColumnFamily.create(cfMeta); + addColumn(cf, buildName(cfMeta, eventId, bytes("source")), FBUtilities.getBroadcastAddress()); + addColumn(cf, buildName(cfMeta, eventId, bytes("thread")), event.getThreadName()); + addColumn(cf, buildName(cfMeta, eventId, bytes("happened_at")), event.getTimeStamp()); + addColumn(cf, buildName(cfMeta, eventId, bytes("source_elapsed")), elapsed); + addColumn(cf, buildName(cfMeta, eventId, bytes("activity")), event.getMessage()); + RowMutation mutation = new RowMutation(Tracing.TRACE_KS, state.sessionIdBytes); + mutation.add(cf); + StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY); + } + }); + } + + public void close() + { + } + + public boolean requiresLayout() + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index f0b36bf..2b5da97 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -24,10 +24,12 @@ package org.apache.cassandra.utils; */ import java.io.*; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.UUID; import static com.google.common.base.Charsets.UTF_8; @@ -523,4 +525,14 @@ public class ByteBufferUtil } return 0; } + + public static ByteBuffer bytes(InetAddress address) + { + return ByteBuffer.wrap(address.getAddress()); + } + + public static ByteBuffer bytes(UUID uuid) + { + return ByteBuffer.wrap(UUIDGen.decompose(uuid)); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java index 196d731..4a93775 100644 --- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java +++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.HashMap; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ColumnFamilyStore; @@ -129,8 +128,8 @@ public class CFMetaDataTest extends SchemaLoader // Test schema conversion RowMutation rm = cfm.toSchema(System.currentTimeMillis()); - ColumnFamily serializedCf = rm.getColumnFamily(Schema.instance.getId(Table.SYSTEM_TABLE, SystemTable.SCHEMA_COLUMNFAMILIES_CF)); - ColumnFamily serializedCD = rm.getColumnFamily(Schema.instance.getId(Table.SYSTEM_TABLE, SystemTable.SCHEMA_COLUMNS_CF)); + ColumnFamily serializedCf = rm.getColumnFamily(Schema.instance.getId(Table.SYSTEM_KS, SystemTable.SCHEMA_COLUMNFAMILIES_CF)); + ColumnFamily serializedCD = rm.getColumnFamily(Schema.instance.getId(Table.SYSTEM_KS, SystemTable.SCHEMA_COLUMNS_CF)); UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_columnfamilies", new Row(k, serializedCf)).one(); CFMetaData newCfm = CFMetaData.addColumnDefinitionSchema(CFMetaData.fromSchemaNoColumns(result), new Row(k, serializedCD)); assert cfm.equals(newCfm) : String.format("\n%s\n!=\n%s", cfm, newCfm); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/test/unit/org/apache/cassandra/config/DefsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java index cca87fb..b01ffc4 100644 --- a/test/unit/org/apache/cassandra/config/DefsTest.java +++ b/test/unit/org/apache/cassandra/config/DefsTest.java @@ -49,8 +49,8 @@ public class DefsTest extends SchemaLoader @Test public void ensureStaticCFMIdsAreLessThan1000() { - assert CFMetaData.OldStatusCf.cfId.equals(CFMetaData.getId(Table.SYSTEM_TABLE, SystemTable.OLD_STATUS_CF)); - assert CFMetaData.OldHintsCf.cfId.equals(CFMetaData.getId(Table.SYSTEM_TABLE, SystemTable.OLD_HINTS_CF)); + assert CFMetaData.OldStatusCf.cfId.equals(CFMetaData.getId(Table.SYSTEM_KS, SystemTable.OLD_STATUS_CF)); + assert CFMetaData.OldHintsCf.cfId.equals(CFMetaData.getId(Table.SYSTEM_KS, SystemTable.OLD_HINTS_CF)); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java index 0bcaff4..e43124d 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java @@ -27,6 +27,7 @@ import java.util.List; import org.junit.Test; import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.Token; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; @@ -72,7 +73,7 @@ public class StorageServiceServerTest public void testColumnFamilySnapshot() throws IOException { // no need to insert extra data, even an "empty" database will have a little information in the system keyspace - StorageService.instance.takeColumnFamilySnapshot("system", "Schema", "cf_snapshot"); + StorageService.instance.takeColumnFamilySnapshot(Table.SYSTEM_KS, "Schema", "cf_snapshot"); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/tools/stress/src/org/apache/cassandra/stress/Session.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java index dbe1951..5763f37 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Session.java +++ b/tools/stress/src/org/apache/cassandra/stress/Session.java @@ -117,6 +117,7 @@ public class Session implements Serializable private boolean ignoreErrors = false; private boolean enable_cql = false; private boolean use_prepared = false; + private boolean trace = false; private final String outFileName; @@ -139,6 +140,7 @@ public class Session implements Serializable public final InetAddress sendToDaemon; public final String comparator; public final boolean timeUUIDComparator; + public double traceProbability = 0.0; public Session(String[] arguments) throws IllegalArgumentException {