lukasz-antoniak commented on code in PR #2021: URL: https://github.com/apache/cassandra-java-driver/pull/2021#discussion_r1969849249
########## open-telemetry/src/main/java/com/datastax/oss/driver/internal/core/tracker/OtelRequestTracker.java: ########## @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.tracker; + +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.cql.QueryTrace; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.api.core.session.Session; +import com.datastax.oss.driver.api.core.tracker.RequestTracker; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.internal.core.channel.DriverChannel; +import com.datastax.oss.driver.internal.core.context.DefaultDriverContext; +import com.datastax.oss.driver.internal.core.cql.CqlRequestHandler; +import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint; +import com.datastax.oss.driver.internal.core.metadata.SniEndPoint; +import com.datastax.oss.driver.internal.core.util.concurrent.LazyReference; +import com.datastax.oss.driver.shaded.guava.common.util.concurrent.ThreadFactoryBuilder; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OtelRequestTracker implements RequestTracker { + + private final Map<String, TracingInfo> logPrefixToTracingInfoMap = new ConcurrentHashMap<>(); + + private final Tracer tracer; + + private final Logger LOG = LoggerFactory.getLogger(OtelRequestTracker.class); + + private final LazyReference<ExecutorService> threadPool; + + private RequestLogFormatter formatter; + private DefaultDriverContext context; + private final Field proxyAddressField = getProxyAddressField(); + + /** + * Attributes that are "conditionally required" or "recommended" but we cannot provide: 1. + * db.collection.name 2. db.response.status_code + */ + private static final AttributeKey<String> DB_SYSTEM_NAME = + AttributeKey.stringKey("db.system.name"); + + private static final AttributeKey<String> DB_NAMESPACE = AttributeKey.stringKey("db.namespace"); + private static final AttributeKey<String> DB_OPERATION_NAME = + AttributeKey.stringKey("db.operation.name"); + private static final AttributeKey<String> ERROR_TYPE = AttributeKey.stringKey("error.type"); + private static final AttributeKey<Long> SERVER_PORT = AttributeKey.longKey("server.port"); + private static final AttributeKey<String> CASSANDRA_CONSISTENCY_LEVEL = + AttributeKey.stringKey("cassandra.consistency.level"); + private static final AttributeKey<String> CASSANDRA_COORDINATOR_DC = + AttributeKey.stringKey("cassandra.coordinator.dc"); + private static final AttributeKey<String> CASSANDRA_COORDINATOR_ID = + AttributeKey.stringKey("cassandra.coordinator.id"); + private static final AttributeKey<Long> CASSANDRA_PAGE_SIZE = + AttributeKey.longKey("cassandra.page.size"); + private static final AttributeKey<Boolean> CASSANDRA_QUERY_IDEMPOTENT = + AttributeKey.booleanKey("cassandra.query.idempotent"); + private static final AttributeKey<String> CASSANDRA_QUERY_ID = + AttributeKey.stringKey("cassandra.query.id"); + private static final AttributeKey<Long> CASSANDRA_SPECULATIVE_EXECUTION_COUNT = + AttributeKey.longKey("cassandra.speculative_execution.count"); + private static final AttributeKey<Long> DB_OPERATION_BATCH_SIZE = + AttributeKey.longKey("db.operation.batch.size"); + private static final AttributeKey<String> DB_QUERY_TEXT = AttributeKey.stringKey("db.query.text"); + private static final AttributeKey<String> SERVER_ADDRESS = + AttributeKey.stringKey("server.address"); + + public OtelRequestTracker(OpenTelemetry openTelemetry) { + this.tracer = + openTelemetry.getTracer("com.datastax.oss.driver.internal.core.tracker.OtelRequestTracker"); + this.threadPool = + new LazyReference<>( + () -> + new ThreadPoolExecutor( + 1, + Math.max(Runtime.getRuntime().availableProcessors(), 1), + 10, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1000), + new ThreadFactoryBuilder().setNameFormat("otel-thread-%d").build(), + new ThreadPoolExecutor.AbortPolicy())); + } + + @Override + public void close() throws Exception { + threadPool.get().shutdown(); + threadPool.get().awaitTermination(10, TimeUnit.SECONDS); + logPrefixToTracingInfoMap.clear(); + } + + @Override + public void onRequestCreated( + @NonNull Request request, + @NonNull DriverExecutionProfile executionProfile, + @NonNull String requestLogPrefix) { + Span parentSpan = tracer.spanBuilder("Cassandra Java Driver Session Request").startSpan(); + TracingInfo tracingInfo = new TracingInfo(parentSpan); + logPrefixToTracingInfoMap.put(requestLogPrefix, tracingInfo); + addRequestAttributesToSpan(request, parentSpan, false); + LOG.debug("Request created: {}", requestLogPrefix); + } + + @Override + public void onRequestCreatedForNode( + @NonNull Request request, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull String requestLogPrefix) { + logPrefixToTracingInfoMap.computeIfPresent( + nodePrefixToRequestPrefix(requestLogPrefix), + (k, v) -> { + Span parentSpan = v.parentSpan; + Span span = + tracer + .spanBuilder("Cassandra Java Driver Node Request") + .setParent(Context.current().with(parentSpan)) + .startSpan(); + addRequestAttributesToSpan(request, span, true); + v.addNodeSpan(requestLogPrefix, span); + return v; + }); + LOG.debug("Request created for node: {}", requestLogPrefix); + } + + @Override + public void onSuccess( + long latencyNanos, @NonNull ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { + logPrefixToTracingInfoMap.computeIfPresent( + requestLogPrefix, + (k, v) -> { + Span span = v.parentSpan; + span.setAttribute(CASSANDRA_QUERY_ID, requestLogPrefix); + span.setStatus(StatusCode.OK); + addRequestAttributesToSpan(executionInfo.getRequest(), span, false); + addExecutionInfoToSpan(executionInfo, span); + span.end(); + return null; + }); + } + + @Override + public void onError( + long latencyNanos, @NonNull ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { + logPrefixToTracingInfoMap.computeIfPresent( + requestLogPrefix, + (k, v) -> { + Span span = v.parentSpan; + span.setAttribute(CASSANDRA_QUERY_ID, requestLogPrefix); + if (!executionInfo.getErrors().isEmpty()) { + span.recordException(executionInfo.getErrors().get(0).getValue()); + } + span.setStatus(StatusCode.ERROR); + addRequestAttributesToSpan(executionInfo.getRequest(), span, false); + addExecutionInfoToSpan(executionInfo, span); + span.end(); + return null; + }); + } + + @Override + public void onNodeSuccess( + long latencyNanos, @NonNull ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { + logPrefixToTracingInfoMap.computeIfPresent( + nodePrefixToRequestPrefix(requestLogPrefix), + (k, v) -> { + Span span = v.getNodeSpan(requestLogPrefix); + span.setAttribute(CASSANDRA_QUERY_ID, requestLogPrefix); + span.setStatus(StatusCode.OK); + addRequestAttributesToSpan(executionInfo.getRequest(), span, true); + addExecutionInfoToSpan(executionInfo, span); + span.end(); + if (executionInfo.getTracingId() != null) { + threadPool + .get() + .submit( + () -> { + QueryTrace queryTrace = executionInfo.getQueryTrace(); + addCassandraQueryTraceToSpan(span, queryTrace); + }); + } + return v; + }); + } + + @Override + public void onNodeError( + long latencyNanos, @NonNull ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { + logPrefixToTracingInfoMap.computeIfPresent( + nodePrefixToRequestPrefix(requestLogPrefix), + (k, v) -> { + Span span = v.getNodeSpan(requestLogPrefix); + span.setAttribute(CASSANDRA_QUERY_ID, requestLogPrefix); + if (!executionInfo.getErrors().isEmpty()) { + /* + Find the first error for this node. Because a node can appear twice in the errors list due + to retry policy, the error for this NodeRequest may not actually be the first error in + this list. In that scenario, the wrong error may be attached to this span, but this is the best we can do. + */ + executionInfo + .getErrors() + .forEach( + entry -> { + if (entry + .getKey() + .getHostId() + .equals(executionInfo.getCoordinator().getHostId())) { + span.recordException(entry.getValue()); + } + }); + } + span.setStatus(StatusCode.ERROR); + addRequestAttributesToSpan(executionInfo.getRequest(), span, true); + addExecutionInfoToSpan(executionInfo, span); + span.end(); + if (executionInfo.getTracingId() != null) { + threadPool + .get() + .submit( + () -> { + QueryTrace queryTrace = executionInfo.getQueryTrace(); + addCassandraQueryTraceToSpan(span, queryTrace); + }); + } + return v; + }); + } + + @Override + public void onSessionReady(@NonNull Session session) { + this.context = (DefaultDriverContext) session.getContext(); + this.formatter = this.context.getRequestLogFormatter(); + } + + private static class TracingInfo { + private final Span parentSpan; + private final Map<String, Span> nodeSpans = new ConcurrentHashMap<>(); // logPrefix -> span + + private TracingInfo(Span parentSpan) { + this.parentSpan = parentSpan; + } + + private void addNodeSpan(String logPrefix, Span span) { + nodeSpans.put(logPrefix, span); + } + + private Span getNodeSpan(String logPrefix) { + return nodeSpans.get(logPrefix); + } + } + + private void addRequestAttributesToSpan(Request request, Span span, boolean isNodeRequest) { + span.setAttribute(DB_SYSTEM_NAME, "cassandra"); + String operationName = + String.format( + "%s(%s)", + isNodeRequest ? "Node_Request" : "Session_Request", request.getClass().getSimpleName()); + span.setAttribute(DB_OPERATION_NAME, operationName); + if (request.getKeyspace() != null) + span.setAttribute(DB_NAMESPACE, request.getKeyspace().asCql(true)); + + if (request instanceof Statement<?>) { + String consistencyLevel; + if (((Statement<?>) request).getConsistencyLevel() != null) { + consistencyLevel = ((Statement<?>) request).getConsistencyLevel().name(); + } else { + consistencyLevel = + context + .getConfig() + .getDefaultProfile() + .getString(DefaultDriverOption.REQUEST_CONSISTENCY); + } + span.setAttribute(CASSANDRA_CONSISTENCY_LEVEL, consistencyLevel); + + int pageSize; + if (((Statement<?>) request).getPageSize() > 0) { + pageSize = ((Statement<?>) request).getPageSize(); + } else { + pageSize = + context.getConfig().getDefaultProfile().getInt(DefaultDriverOption.REQUEST_PAGE_SIZE); + } + span.setAttribute(CASSANDRA_PAGE_SIZE, pageSize); + } + + span.setAttribute(DB_QUERY_TEXT, requestToString(request)); + if (request.isIdempotent() != null) + span.setAttribute(CASSANDRA_QUERY_IDEMPOTENT, request.isIdempotent()); + + if (request instanceof BatchStatement) { + span.setAttribute(DB_OPERATION_BATCH_SIZE, ((BatchStatement) request).size()); + } + + if (request instanceof BoundStatement) { + addParametersOfBoundStatementToSpan(span, (BoundStatement) request); + } + } + + private void addExecutionInfoToSpan(ExecutionInfo executionInfo, Span span) { + Node node = executionInfo.getCoordinator(); + if (node != null) { + addServerAddressAndPortToSpan(span, node); + span.setAttribute(CASSANDRA_COORDINATOR_ID, node.getHostId().toString()); + span.setAttribute(CASSANDRA_COORDINATOR_DC, node.getDatacenter()); + } + + /* + Find the first error for this node. Because a node can appear twice in the errors list due + to retry policy, the error for this NodeRequest may not actually be the first error in + this list. In that scenario, the wrong error may be attached to this span, but this is the best we can do. + */ + executionInfo + .getErrors() + .forEach( + entry -> { + if (entry.getKey().getHostId().equals(node.getHostId())) { + span.setAttribute(ERROR_TYPE, entry.getValue().getClass().getSimpleName()); + } + }); + + span.setAttribute( + CASSANDRA_SPECULATIVE_EXECUTION_COUNT, executionInfo.getSpeculativeExecutionCount()); + } + + private String requestToString(Request request) { + StringBuilder builder = new StringBuilder(); + assert this.formatter != null; + this.formatter.appendQueryString( + request, RequestLogger.DEFAULT_REQUEST_LOGGER_MAX_QUERY_LENGTH, builder); + this.formatter.appendValues( + request, + RequestLogger.DEFAULT_REQUEST_LOGGER_MAX_VALUES, + RequestLogger.DEFAULT_REQUEST_LOGGER_MAX_VALUE_LENGTH, + true, + builder); + return builder.toString(); + } + + private void addParametersOfBoundStatementToSpan(Span span, BoundStatement statement) { + ColumnDefinitions definitions = statement.getPreparedStatement().getVariableDefinitions(); + List<ByteBuffer> values = statement.getValues(); + assert definitions.size() == values.size(); + for (int i = 0; i < definitions.size(); i++) { + String key = "db.operation.parameter." + definitions.get(i).getName().asCql(true); + StringBuilder valueBuilder = new StringBuilder(); + if (!statement.isSet(i)) { + valueBuilder.append("<UNSET>"); + } else { + ByteBuffer value = values.get(i); + DataType type = definitions.get(i).getType(); + this.formatter.appendValue( + value, type, RequestLogger.DEFAULT_REQUEST_LOGGER_MAX_VALUE_LENGTH, valueBuilder); + } + span.setAttribute(key, valueBuilder.toString()); + } + } + + private void addServerAddressAndPortToSpan(Span span, Node coordinator) { + EndPoint endPoint = coordinator.getEndPoint(); + if (endPoint instanceof DefaultEndPoint) { + InetSocketAddress address = ((DefaultEndPoint) endPoint).resolve(); + span.setAttribute(SERVER_ADDRESS, address.getHostString()); + span.setAttribute(SERVER_PORT, address.getPort()); + } else if (endPoint instanceof SniEndPoint && proxyAddressField != null) { + SniEndPoint sniEndPoint = (SniEndPoint) endPoint; + Object object = null; + try { + object = proxyAddressField.get(sniEndPoint); + } catch (Exception e) { + this.LOG.trace( + "Error when accessing the private field proxyAddress of SniEndPoint using reflection."); + } + if (object instanceof InetSocketAddress) { + InetSocketAddress address = (InetSocketAddress) object; + span.setAttribute(SERVER_ADDRESS, address.getHostString()); + span.setAttribute(SERVER_PORT, address.getPort()); + } + } + } + + /** + * This depends on the implementation of {@link + * CqlRequestHandler.NodeResponseCallback#NodeResponseCallback(Statement, Node, Queue, + * DriverChannel, int, int, boolean, String) NodeResponseCallback} + * + * @param nodePrefix s0|1716164115|0 + * @return the request prefix, like s0|1716164115 + */ + private static String nodePrefixToRequestPrefix(String nodePrefix) { + int lastSeparatorIndex = nodePrefix.lastIndexOf("|"); + return nodePrefix.substring(0, lastSeparatorIndex); + } + + @Nullable + private Field getProxyAddressField() { Review Comment: We should not need to do that on our own code base. AFAIK, we should just add getter in `SniEndPoint`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org