ctubbsii commented on code in PR #5398:
URL: https://github.com/apache/accumulo/pull/5398#discussion_r2029398114


##########
core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+/**
+ * Factory for creating instances of the AccumuloProtocol.
+ * <p>
+ * This protocol includes a custom header to ensure compatibility between 
different versions of the
+ * protocol.
+ */
+public class AccumuloProtocolFactory extends TCompactProtocol.Factory {
+
+  private static final long serialVersionUID = 1L;
+
+  private final boolean isClient;
+
+  public static class AccumuloProtocol extends TCompactProtocol {
+
+    static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII
+    static final byte PROTOCOL_VERSION = 1;
+    private static final boolean HEADER_HAS_TRACE = true;
+
+    private final boolean isClient;
+
+    private Span span = null;
+    private Scope scope = null;
+    private final Map<String,String> traceHeaders = new HashMap<>();
+
+    public AccumuloProtocol(TTransport transport, boolean isClient) {
+      super(transport);
+      this.isClient = isClient;
+    }
+
+    /**
+     * For client calls, add RPC span and write the validation header
+     */
+    @Override
+    public void writeMessageBegin(TMessage message) throws TException {
+      if (!this.isClient) {
+        super.writeMessageBegin(message);
+      } else {
+        span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
+        scope = span.makeCurrent();
+
+        try {
+          this.writeHeader();
+          super.writeMessageBegin(message);
+        } catch (TException e) {
+          if (span != null) {
+            TraceUtil.setException(span, e, false);
+          }
+          if (scope != null) {
+            scope.close();
+          }
+          if (span != null) {
+            span.end();
+          }
+          throw e;
+        }
+      }
+    }
+
+    /**
+     * Writes the Accumulo protocol header containing version and 
identification info
+     */
+    private void writeHeader() throws TException {

Review Comment:
   Just because this code is reused, and this is only used by the client, it 
would be good to change this:
   
   ```suggestion
       private void writeClientHeader() throws TException {
   ```



##########
core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+/**
+ * Factory for creating instances of the AccumuloProtocol.
+ * <p>
+ * This protocol includes a custom header to ensure compatibility between 
different versions of the
+ * protocol.
+ */
+public class AccumuloProtocolFactory extends TCompactProtocol.Factory {
+
+  private static final long serialVersionUID = 1L;
+
+  private final boolean isClient;
+
+  public static class AccumuloProtocol extends TCompactProtocol {
+
+    static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII
+    static final byte PROTOCOL_VERSION = 1;
+    private static final boolean HEADER_HAS_TRACE = true;
+
+    private final boolean isClient;
+
+    private Span span = null;
+    private Scope scope = null;
+    private final Map<String,String> traceHeaders = new HashMap<>();
+
+    public AccumuloProtocol(TTransport transport, boolean isClient) {
+      super(transport);
+      this.isClient = isClient;
+    }
+
+    /**
+     * For client calls, add RPC span and write the validation header
+     */
+    @Override
+    public void writeMessageBegin(TMessage message) throws TException {
+      if (!this.isClient) {
+        super.writeMessageBegin(message);
+      } else {
+        span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
+        scope = span.makeCurrent();
+
+        try {
+          this.writeHeader();
+          super.writeMessageBegin(message);
+        } catch (TException e) {
+          if (span != null) {
+            TraceUtil.setException(span, e, false);
+          }
+          if (scope != null) {
+            scope.close();
+          }
+          if (span != null) {
+            span.end();
+          }
+          throw e;
+        }
+      }
+    }
+
+    /**
+     * Writes the Accumulo protocol header containing version and 
identification info
+     */
+    private void writeHeader() throws TException {
+      super.writeI32(MAGIC_NUMBER);
+      super.writeByte(PROTOCOL_VERSION);
+
+      if (span == null || !span.getSpanContext().isValid()) {
+        super.writeBool(!HEADER_HAS_TRACE);
+        return;
+      }
+
+      super.writeBool(HEADER_HAS_TRACE);
+      traceHeaders.clear();
+
+      TraceUtil.injectTraceContext(traceHeaders);
+
+      super.writeI32(traceHeaders.size());
+
+      for (Map.Entry<String,String> entry : traceHeaders.entrySet()) {
+        super.writeString(entry.getKey());
+        super.writeString(entry.getValue());
+      }
+    }
+
+    @Override
+    public void writeMessageEnd() throws TException {
+      try {
+        super.writeMessageEnd();
+      } finally {
+        if (scope != null) {
+          scope.close();
+          span.end();
+        }

Review Comment:
   If this can be reused, then these need to be set back to null before the 
next begin message:
   
   ```suggestion
           if (scope != null) {
             scope.close();
             span.end();
           }
           scope = null;
           span = null;
   ```
   
   I'm actually not sure how thread-safe this protocol needs to be, but I think 
this is okay, so long as they are reset before the next message.



##########
core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+/**
+ * Factory for creating instances of the AccumuloProtocol.
+ * <p>
+ * This protocol includes a custom header to ensure compatibility between 
different versions of the
+ * protocol.
+ */
+public class AccumuloProtocolFactory extends TCompactProtocol.Factory {
+
+  private static final long serialVersionUID = 1L;
+
+  private final boolean isClient;
+
+  public static class AccumuloProtocol extends TCompactProtocol {
+
+    static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII
+    static final byte PROTOCOL_VERSION = 1;
+    private static final boolean HEADER_HAS_TRACE = true;
+
+    private final boolean isClient;
+
+    private Span span = null;
+    private Scope scope = null;
+    private final Map<String,String> traceHeaders = new HashMap<>();
+
+    public AccumuloProtocol(TTransport transport, boolean isClient) {
+      super(transport);
+      this.isClient = isClient;
+    }
+
+    /**
+     * For client calls, add RPC span and write the validation header
+     */
+    @Override
+    public void writeMessageBegin(TMessage message) throws TException {
+      if (!this.isClient) {
+        super.writeMessageBegin(message);
+      } else {
+        span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
+        scope = span.makeCurrent();
+
+        try {
+          this.writeHeader();
+          super.writeMessageBegin(message);
+        } catch (TException e) {
+          if (span != null) {
+            TraceUtil.setException(span, e, false);
+          }
+          if (scope != null) {
+            scope.close();
+          }
+          if (span != null) {
+            span.end();
+          }
+          throw e;
+        }
+      }
+    }
+
+    /**
+     * Writes the Accumulo protocol header containing version and 
identification info
+     */
+    private void writeHeader() throws TException {
+      super.writeI32(MAGIC_NUMBER);
+      super.writeByte(PROTOCOL_VERSION);
+
+      if (span == null || !span.getSpanContext().isValid()) {
+        super.writeBool(!HEADER_HAS_TRACE);
+        return;
+      }
+
+      super.writeBool(HEADER_HAS_TRACE);
+      traceHeaders.clear();
+
+      TraceUtil.injectTraceContext(traceHeaders);

Review Comment:
   This seems like the map construction could be deferred to the TraceUtil:
   
   ```java
     var serializedTraceContext = TraceUtil.serializeContext(Context.current());
   super.writeString(serializedTraceContext);
   ```
   
   The same would be true for the deserialize (instead of 
`TraceUtil.extractTraceContext(headers)`):
   
   ```java
     Context extractedContext = 
TraceUtil.deserializeContext(serializedTraceContext);
   ```
   
   I think this is better because the protocol shouldn't be so tightly coupled 
to the serialization/deserialization of the trace Context. It just needs to 
write and read it.



##########
core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+/**
+ * Factory for creating instances of the AccumuloProtocol.
+ * <p>
+ * This protocol includes a custom header to ensure compatibility between 
different versions of the
+ * protocol.
+ */
+public class AccumuloProtocolFactory extends TCompactProtocol.Factory {
+
+  private static final long serialVersionUID = 1L;
+
+  private final boolean isClient;
+
+  public static class AccumuloProtocol extends TCompactProtocol {
+
+    static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII
+    static final byte PROTOCOL_VERSION = 1;
+    private static final boolean HEADER_HAS_TRACE = true;
+
+    private final boolean isClient;
+
+    private Span span = null;
+    private Scope scope = null;
+    private final Map<String,String> traceHeaders = new HashMap<>();
+
+    public AccumuloProtocol(TTransport transport, boolean isClient) {
+      super(transport);
+      this.isClient = isClient;
+    }
+
+    /**
+     * For client calls, add RPC span and write the validation header
+     */
+    @Override
+    public void writeMessageBegin(TMessage message) throws TException {
+      if (!this.isClient) {
+        super.writeMessageBegin(message);
+      } else {
+        span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
+        scope = span.makeCurrent();
+
+        try {
+          this.writeHeader();
+          super.writeMessageBegin(message);
+        } catch (TException e) {
+          if (span != null) {
+            TraceUtil.setException(span, e, false);
+          }
+          if (scope != null) {
+            scope.close();
+          }
+          if (span != null) {
+            span.end();
+          }
+          throw e;
+        }
+      }
+    }
+
+    /**
+     * Writes the Accumulo protocol header containing version and 
identification info
+     */
+    private void writeHeader() throws TException {
+      super.writeI32(MAGIC_NUMBER);
+      super.writeByte(PROTOCOL_VERSION);
+
+      if (span == null || !span.getSpanContext().isValid()) {
+        super.writeBool(!HEADER_HAS_TRACE);
+        return;
+      }
+
+      super.writeBool(HEADER_HAS_TRACE);

Review Comment:
   It is a bit confusing to have a fixed boolean constant whose value is either 
used or its opposite is used based on some condition. It makes much more sense 
to just check the value directly, and skip the whole constant:
   
   ```suggestion
         final boolean headerHasTrace = span != null && 
span.getSpanContext().isValid();
         super.writeBool(headerHasTrace);
   
         if (!headerHasTrace) {
           return;
         }
   
   ```



##########
core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java:
##########
@@ -191,22 +194,13 @@ public static <T> Callable<T> unwrap(Callable<T> c) {
 
   public static <T> T wrapService(final T instance) {
     InvocationHandler handler = (obj, method, args) -> {
-      if (args == null || args.length < 1 || args[0] == null || !(args[0] 
instanceof TInfo)) {
-        try {
-          return method.invoke(instance, args);
-        } catch (InvocationTargetException e) {
-          throw e.getCause();
-        }
-      }
-      Span span = startServerRpcSpan(instance.getClass(), method.getName(), 
(TInfo) args[0]);
-      try (Scope scope = span.makeCurrent()) {
+      Span span = Span.current(); // should be set by protocol
+      try {

Review Comment:
   So, I think that what the old code was doing when it called 
`startServerRpcSpan` was that it was always creating a new "inner span" inside 
the outer span that that the user might have created in the client. That was 
useful because it helped us distinguish the a child or inner span that was just 
associated with the RPC call, rather than be grouped with all the other things 
the client code might have been doing around the RPC call.
   
   This code, however, seems to just grab the current span the user created, 
and attach an exception to it if an RPC exception occurred. I think we should 
do the other thing. If there's an exception, it should be attached to the 
inner/child span for the RPC, not the outer span the user created.



##########
core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+/**
+ * Factory for creating instances of the AccumuloProtocol.
+ * <p>
+ * This protocol includes a custom header to ensure compatibility between 
different versions of the
+ * protocol.
+ */
+public class AccumuloProtocolFactory extends TCompactProtocol.Factory {
+
+  private static final long serialVersionUID = 1L;
+
+  private final boolean isClient;
+
+  public static class AccumuloProtocol extends TCompactProtocol {
+
+    static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII
+    static final byte PROTOCOL_VERSION = 1;
+    private static final boolean HEADER_HAS_TRACE = true;
+
+    private final boolean isClient;
+
+    private Span span = null;
+    private Scope scope = null;
+    private final Map<String,String> traceHeaders = new HashMap<>();
+
+    public AccumuloProtocol(TTransport transport, boolean isClient) {
+      super(transport);
+      this.isClient = isClient;
+    }
+
+    /**
+     * For client calls, add RPC span and write the validation header
+     */
+    @Override
+    public void writeMessageBegin(TMessage message) throws TException {
+      if (!this.isClient) {
+        super.writeMessageBegin(message);
+      } else {
+        span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
+        scope = span.makeCurrent();
+
+        try {
+          this.writeHeader();
+          super.writeMessageBegin(message);
+        } catch (TException e) {
+          if (span != null) {
+            TraceUtil.setException(span, e, false);
+          }
+          if (scope != null) {
+            scope.close();
+          }
+          if (span != null) {
+            span.end();
+          }
+          throw e;
+        }
+      }
+    }
+
+    /**
+     * Writes the Accumulo protocol header containing version and 
identification info
+     */
+    private void writeHeader() throws TException {
+      super.writeI32(MAGIC_NUMBER);
+      super.writeByte(PROTOCOL_VERSION);
+
+      if (span == null || !span.getSpanContext().isValid()) {
+        super.writeBool(!HEADER_HAS_TRACE);
+        return;
+      }
+
+      super.writeBool(HEADER_HAS_TRACE);
+      traceHeaders.clear();
+
+      TraceUtil.injectTraceContext(traceHeaders);
+
+      super.writeI32(traceHeaders.size());
+
+      for (Map.Entry<String,String> entry : traceHeaders.entrySet()) {
+        super.writeString(entry.getKey());
+        super.writeString(entry.getValue());
+      }
+    }
+
+    @Override
+    public void writeMessageEnd() throws TException {
+      try {
+        super.writeMessageEnd();
+      } finally {
+        if (scope != null) {
+          scope.close();
+          span.end();
+        }
+      }
+    }
+
+    /**
+     * For server calls, validate the header
+     */
+    @Override
+    public TMessage readMessageBegin() throws TException {
+      if (!this.isClient) {
+        this.validateHeader();
+      }
+
+      return super.readMessageBegin();
+    }
+
+    /**
+     * Reads and validates the Accumulo protocol header
+     *
+     * @throws TException if the header is invalid or incompatible
+     */
+    void validateHeader() throws TException {
+      final int magic = super.readI32();
+      if (magic != MAGIC_NUMBER) {
+        throw new TException("Invalid Accumulo protocol: magic number 
mismatch. Expected: 0x"
+            + Integer.toHexString(MAGIC_NUMBER) + ", got: 0x" + 
Integer.toHexString(magic));
+      }
+
+      final byte version = super.readByte();
+      validateProtocolVersion(version);
+
+      final boolean hasTrace = super.readBool();
+
+      if (hasTrace) {
+        final int numHeaders = super.readI32();
+
+        final Map<String,String> headers = new HashMap<>(numHeaders);
+        for (int i = 0; i < numHeaders; i++) {
+          String key = super.readString();
+          String value = super.readString();
+          headers.put(key, value);
+        }
+
+        if (!headers.isEmpty()) {
+          Context extractedContext = TraceUtil.extractTraceContext(headers);
+
+          // Create server span with extracted context as parent
+          span = TraceUtil.startServerRpcSpanFromContext(this.getClass(), 
"handleMessage",

Review Comment:
   `handleMessage` is a bit generic. Perhaps `handleRpcMessage` might be more 
clear when looking at the traces?



##########
core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java:
##########
@@ -131,48 +176,6 @@ public static void setException(Span span, Throwable e, 
boolean rethrown) {
     }
   }
 
-  /**
-   * Obtain {@link org.apache.accumulo.core.clientImpl.thrift.TInfo} for the 
current context. This
-   * is used to send the current trace information to a remote process
-   */
-  public static TInfo traceInfo() {
-    TInfo tinfo = new TInfo();
-    W3CTraceContextPropagator.getInstance().inject(Context.current(), tinfo, 
TInfo::putToHeaders);
-    return tinfo;
-  }
-
-  /**
-   * Returns a newly created Context from the TInfo object sent by a remote 
process. The Context can
-   * then be used in this process to continue the tracing. The Context is used 
like:
-   *
-   * <pre>
-   * Context remoteCtx = getContext(tinfo);
-   * Span span = tracer.spanBuilder(name).setParent(remoteCtx).startSpan()
-   * </pre>
-   *
-   * @param tinfo tracing information serialized over Thrift
-   */
-  private static Context getContext(TInfo tinfo) {
-    return W3CTraceContextPropagator.getInstance().extract(Context.current(), 
tinfo,
-        new TextMapGetter<TInfo>() {
-          @Override
-          public Iterable<String> keys(TInfo carrier) {
-            if (carrier.getHeaders() == null) {
-              return null;
-            }
-            return carrier.getHeaders().keySet();
-          }
-
-          @Override
-          public String get(TInfo carrier, String key) {
-            if (carrier.getHeaders() == null) {
-              return null;
-            }
-            return carrier.getHeaders().get(key);
-          }
-        });
-  }
-

Review Comment:
   These two old methods were basically serialize/deserialize. We can replace 
them with new serialize/deserialize methods, even though the type is not TInfo, 
but some other serialized form (String).



##########
core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+/**
+ * Factory for creating instances of the AccumuloProtocol.
+ * <p>
+ * This protocol includes a custom header to ensure compatibility between 
different versions of the
+ * protocol.
+ */
+public class AccumuloProtocolFactory extends TCompactProtocol.Factory {
+
+  private static final long serialVersionUID = 1L;
+
+  private final boolean isClient;
+
+  public static class AccumuloProtocol extends TCompactProtocol {
+
+    static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII
+    static final byte PROTOCOL_VERSION = 1;
+    private static final boolean HEADER_HAS_TRACE = true;
+
+    private final boolean isClient;
+
+    private Span span = null;
+    private Scope scope = null;
+    private final Map<String,String> traceHeaders = new HashMap<>();
+
+    public AccumuloProtocol(TTransport transport, boolean isClient) {
+      super(transport);
+      this.isClient = isClient;
+    }
+
+    /**
+     * For client calls, add RPC span and write the validation header
+     */
+    @Override
+    public void writeMessageBegin(TMessage message) throws TException {
+      if (!this.isClient) {
+        super.writeMessageBegin(message);
+      } else {
+        span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
+        scope = span.makeCurrent();
+
+        try {
+          this.writeHeader();

Review Comment:
   ```suggestion
             writeClientHeader();
   ```



##########
core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+/**
+ * Factory for creating instances of the AccumuloProtocol.
+ * <p>
+ * This protocol includes a custom header to ensure compatibility between 
different versions of the
+ * protocol.
+ */
+public class AccumuloProtocolFactory extends TCompactProtocol.Factory {
+
+  private static final long serialVersionUID = 1L;
+
+  private final boolean isClient;
+
+  public static class AccumuloProtocol extends TCompactProtocol {
+
+    static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII
+    static final byte PROTOCOL_VERSION = 1;
+    private static final boolean HEADER_HAS_TRACE = true;
+
+    private final boolean isClient;
+
+    private Span span = null;
+    private Scope scope = null;
+    private final Map<String,String> traceHeaders = new HashMap<>();
+
+    public AccumuloProtocol(TTransport transport, boolean isClient) {
+      super(transport);
+      this.isClient = isClient;
+    }
+
+    /**
+     * For client calls, add RPC span and write the validation header
+     */
+    @Override
+    public void writeMessageBegin(TMessage message) throws TException {
+      if (!this.isClient) {
+        super.writeMessageBegin(message);
+      } else {
+        span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
+        scope = span.makeCurrent();
+
+        try {
+          this.writeHeader();
+          super.writeMessageBegin(message);
+        } catch (TException e) {
+          if (span != null) {
+            TraceUtil.setException(span, e, false);
+          }
+          if (scope != null) {
+            scope.close();
+          }
+          if (span != null) {
+            span.end();
+          }
+          throw e;
+        }
+      }
+    }
+
+    /**
+     * Writes the Accumulo protocol header containing version and 
identification info
+     */
+    private void writeHeader() throws TException {
+      super.writeI32(MAGIC_NUMBER);
+      super.writeByte(PROTOCOL_VERSION);
+
+      if (span == null || !span.getSpanContext().isValid()) {
+        super.writeBool(!HEADER_HAS_TRACE);
+        return;
+      }
+
+      super.writeBool(HEADER_HAS_TRACE);
+      traceHeaders.clear();
+
+      TraceUtil.injectTraceContext(traceHeaders);
+
+      super.writeI32(traceHeaders.size());
+
+      for (Map.Entry<String,String> entry : traceHeaders.entrySet()) {
+        super.writeString(entry.getKey());
+        super.writeString(entry.getValue());
+      }
+    }
+
+    @Override
+    public void writeMessageEnd() throws TException {
+      try {
+        super.writeMessageEnd();
+      } finally {
+        if (scope != null) {
+          scope.close();
+          span.end();
+        }
+      }
+    }
+
+    /**
+     * For server calls, validate the header
+     */
+    @Override
+    public TMessage readMessageBegin() throws TException {
+      if (!this.isClient) {
+        this.validateHeader();
+      }
+
+      return super.readMessageBegin();
+    }
+
+    /**
+     * Reads and validates the Accumulo protocol header
+     *
+     * @throws TException if the header is invalid or incompatible
+     */
+    void validateHeader() throws TException {
+      final int magic = super.readI32();
+      if (magic != MAGIC_NUMBER) {
+        throw new TException("Invalid Accumulo protocol: magic number 
mismatch. Expected: 0x"
+            + Integer.toHexString(MAGIC_NUMBER) + ", got: 0x" + 
Integer.toHexString(magic));
+      }
+
+      final byte version = super.readByte();
+      validateProtocolVersion(version);
+
+      final boolean hasTrace = super.readBool();
+
+      if (hasTrace) {
+        final int numHeaders = super.readI32();
+
+        final Map<String,String> headers = new HashMap<>(numHeaders);
+        for (int i = 0; i < numHeaders; i++) {
+          String key = super.readString();
+          String value = super.readString();
+          headers.put(key, value);
+        }
+
+        if (!headers.isEmpty()) {
+          Context extractedContext = TraceUtil.extractTraceContext(headers);
+
+          // Create server span with extracted context as parent
+          span = TraceUtil.startServerRpcSpanFromContext(this.getClass(), 
"handleMessage",
+              extractedContext);
+          scope = span.makeCurrent();
+        }
+      }
+    }
+
+    /**
+     * @throws TException if the given protocol version is incompatible with 
the current version
+     */
+    private void validateProtocolVersion(byte protocolVersion) throws 
TException {
+      if (protocolVersion != PROTOCOL_VERSION) {
+        throw new TException("Incompatible protocol version. Version seen: " + 
protocolVersion
+            + ", expected version: " + PROTOCOL_VERSION);

Review Comment:
   This requires us to keep the protocol version updated whenever we change the 
thrift RPC... however, I think we need to proactively prevent communication 
with any potentially incompatible version of Accumulo.
   
   We generally try to support rolling upgrades for bugfix releases, but for 
all other upgrades, we expect the whole cluster to be shut down and restarted 
with the new version. With that in mind, I think we should serialize 
Constants.VERSION to the protocol, and we should validate by matching on the 
first two elements of the Accumulo version to ensure it matches the server's 
own first two elements of its Constants.VERSION.
   
   We can still have the protocol version in case we change how things are 
serialized in the protocol (such as adding another header field prior to the 
serialized trace context or we want to change the boolean to an int or 
something)... but we won't need to bump it very often unless we change the 
protocol itself. We shouldn't need to bump it just because of an Accumulo 
version release, but we still need to check the Accumulo version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to