ctubbsii commented on code in PR #5398: URL: https://github.com/apache/accumulo/pull/5398#discussion_r2029398114
########## core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +/** + * Factory for creating instances of the AccumuloProtocol. + * <p> + * This protocol includes a custom header to ensure compatibility between different versions of the + * protocol. + */ +public class AccumuloProtocolFactory extends TCompactProtocol.Factory { + + private static final long serialVersionUID = 1L; + + private final boolean isClient; + + public static class AccumuloProtocol extends TCompactProtocol { + + static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII + static final byte PROTOCOL_VERSION = 1; + private static final boolean HEADER_HAS_TRACE = true; + + private final boolean isClient; + + private Span span = null; + private Scope scope = null; + private final Map<String,String> traceHeaders = new HashMap<>(); + + public AccumuloProtocol(TTransport transport, boolean isClient) { + super(transport); + this.isClient = isClient; + } + + /** + * For client calls, add RPC span and write the validation header + */ + @Override + public void writeMessageBegin(TMessage message) throws TException { + if (!this.isClient) { + super.writeMessageBegin(message); + } else { + span = TraceUtil.startClientRpcSpan(this.getClass(), message.name); + scope = span.makeCurrent(); + + try { + this.writeHeader(); + super.writeMessageBegin(message); + } catch (TException e) { + if (span != null) { + TraceUtil.setException(span, e, false); + } + if (scope != null) { + scope.close(); + } + if (span != null) { + span.end(); + } + throw e; + } + } + } + + /** + * Writes the Accumulo protocol header containing version and identification info + */ + private void writeHeader() throws TException { Review Comment: Just because this code is reused, and this is only used by the client, it would be good to change this: ```suggestion private void writeClientHeader() throws TException { ``` ########## core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +/** + * Factory for creating instances of the AccumuloProtocol. + * <p> + * This protocol includes a custom header to ensure compatibility between different versions of the + * protocol. + */ +public class AccumuloProtocolFactory extends TCompactProtocol.Factory { + + private static final long serialVersionUID = 1L; + + private final boolean isClient; + + public static class AccumuloProtocol extends TCompactProtocol { + + static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII + static final byte PROTOCOL_VERSION = 1; + private static final boolean HEADER_HAS_TRACE = true; + + private final boolean isClient; + + private Span span = null; + private Scope scope = null; + private final Map<String,String> traceHeaders = new HashMap<>(); + + public AccumuloProtocol(TTransport transport, boolean isClient) { + super(transport); + this.isClient = isClient; + } + + /** + * For client calls, add RPC span and write the validation header + */ + @Override + public void writeMessageBegin(TMessage message) throws TException { + if (!this.isClient) { + super.writeMessageBegin(message); + } else { + span = TraceUtil.startClientRpcSpan(this.getClass(), message.name); + scope = span.makeCurrent(); + + try { + this.writeHeader(); + super.writeMessageBegin(message); + } catch (TException e) { + if (span != null) { + TraceUtil.setException(span, e, false); + } + if (scope != null) { + scope.close(); + } + if (span != null) { + span.end(); + } + throw e; + } + } + } + + /** + * Writes the Accumulo protocol header containing version and identification info + */ + private void writeHeader() throws TException { + super.writeI32(MAGIC_NUMBER); + super.writeByte(PROTOCOL_VERSION); + + if (span == null || !span.getSpanContext().isValid()) { + super.writeBool(!HEADER_HAS_TRACE); + return; + } + + super.writeBool(HEADER_HAS_TRACE); + traceHeaders.clear(); + + TraceUtil.injectTraceContext(traceHeaders); + + super.writeI32(traceHeaders.size()); + + for (Map.Entry<String,String> entry : traceHeaders.entrySet()) { + super.writeString(entry.getKey()); + super.writeString(entry.getValue()); + } + } + + @Override + public void writeMessageEnd() throws TException { + try { + super.writeMessageEnd(); + } finally { + if (scope != null) { + scope.close(); + span.end(); + } Review Comment: If this can be reused, then these need to be set back to null before the next begin message: ```suggestion if (scope != null) { scope.close(); span.end(); } scope = null; span = null; ``` I'm actually not sure how thread-safe this protocol needs to be, but I think this is okay, so long as they are reset before the next message. ########## core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +/** + * Factory for creating instances of the AccumuloProtocol. + * <p> + * This protocol includes a custom header to ensure compatibility between different versions of the + * protocol. + */ +public class AccumuloProtocolFactory extends TCompactProtocol.Factory { + + private static final long serialVersionUID = 1L; + + private final boolean isClient; + + public static class AccumuloProtocol extends TCompactProtocol { + + static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII + static final byte PROTOCOL_VERSION = 1; + private static final boolean HEADER_HAS_TRACE = true; + + private final boolean isClient; + + private Span span = null; + private Scope scope = null; + private final Map<String,String> traceHeaders = new HashMap<>(); + + public AccumuloProtocol(TTransport transport, boolean isClient) { + super(transport); + this.isClient = isClient; + } + + /** + * For client calls, add RPC span and write the validation header + */ + @Override + public void writeMessageBegin(TMessage message) throws TException { + if (!this.isClient) { + super.writeMessageBegin(message); + } else { + span = TraceUtil.startClientRpcSpan(this.getClass(), message.name); + scope = span.makeCurrent(); + + try { + this.writeHeader(); + super.writeMessageBegin(message); + } catch (TException e) { + if (span != null) { + TraceUtil.setException(span, e, false); + } + if (scope != null) { + scope.close(); + } + if (span != null) { + span.end(); + } + throw e; + } + } + } + + /** + * Writes the Accumulo protocol header containing version and identification info + */ + private void writeHeader() throws TException { + super.writeI32(MAGIC_NUMBER); + super.writeByte(PROTOCOL_VERSION); + + if (span == null || !span.getSpanContext().isValid()) { + super.writeBool(!HEADER_HAS_TRACE); + return; + } + + super.writeBool(HEADER_HAS_TRACE); + traceHeaders.clear(); + + TraceUtil.injectTraceContext(traceHeaders); Review Comment: This seems like the map construction could be deferred to the TraceUtil: ```java var serializedTraceContext = TraceUtil.serializeContext(Context.current()); super.writeString(serializedTraceContext); ``` The same would be true for the deserialize (instead of `TraceUtil.extractTraceContext(headers)`): ```java Context extractedContext = TraceUtil.deserializeContext(serializedTraceContext); ``` I think this is better because the protocol shouldn't be so tightly coupled to the serialization/deserialization of the trace Context. It just needs to write and read it. ########## core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +/** + * Factory for creating instances of the AccumuloProtocol. + * <p> + * This protocol includes a custom header to ensure compatibility between different versions of the + * protocol. + */ +public class AccumuloProtocolFactory extends TCompactProtocol.Factory { + + private static final long serialVersionUID = 1L; + + private final boolean isClient; + + public static class AccumuloProtocol extends TCompactProtocol { + + static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII + static final byte PROTOCOL_VERSION = 1; + private static final boolean HEADER_HAS_TRACE = true; + + private final boolean isClient; + + private Span span = null; + private Scope scope = null; + private final Map<String,String> traceHeaders = new HashMap<>(); + + public AccumuloProtocol(TTransport transport, boolean isClient) { + super(transport); + this.isClient = isClient; + } + + /** + * For client calls, add RPC span and write the validation header + */ + @Override + public void writeMessageBegin(TMessage message) throws TException { + if (!this.isClient) { + super.writeMessageBegin(message); + } else { + span = TraceUtil.startClientRpcSpan(this.getClass(), message.name); + scope = span.makeCurrent(); + + try { + this.writeHeader(); + super.writeMessageBegin(message); + } catch (TException e) { + if (span != null) { + TraceUtil.setException(span, e, false); + } + if (scope != null) { + scope.close(); + } + if (span != null) { + span.end(); + } + throw e; + } + } + } + + /** + * Writes the Accumulo protocol header containing version and identification info + */ + private void writeHeader() throws TException { + super.writeI32(MAGIC_NUMBER); + super.writeByte(PROTOCOL_VERSION); + + if (span == null || !span.getSpanContext().isValid()) { + super.writeBool(!HEADER_HAS_TRACE); + return; + } + + super.writeBool(HEADER_HAS_TRACE); Review Comment: It is a bit confusing to have a fixed boolean constant whose value is either used or its opposite is used based on some condition. It makes much more sense to just check the value directly, and skip the whole constant: ```suggestion final boolean headerHasTrace = span != null && span.getSpanContext().isValid(); super.writeBool(headerHasTrace); if (!headerHasTrace) { return; } ``` ########## core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java: ########## @@ -191,22 +194,13 @@ public static <T> Callable<T> unwrap(Callable<T> c) { public static <T> T wrapService(final T instance) { InvocationHandler handler = (obj, method, args) -> { - if (args == null || args.length < 1 || args[0] == null || !(args[0] instanceof TInfo)) { - try { - return method.invoke(instance, args); - } catch (InvocationTargetException e) { - throw e.getCause(); - } - } - Span span = startServerRpcSpan(instance.getClass(), method.getName(), (TInfo) args[0]); - try (Scope scope = span.makeCurrent()) { + Span span = Span.current(); // should be set by protocol + try { Review Comment: So, I think that what the old code was doing when it called `startServerRpcSpan` was that it was always creating a new "inner span" inside the outer span that that the user might have created in the client. That was useful because it helped us distinguish the a child or inner span that was just associated with the RPC call, rather than be grouped with all the other things the client code might have been doing around the RPC call. This code, however, seems to just grab the current span the user created, and attach an exception to it if an RPC exception occurred. I think we should do the other thing. If there's an exception, it should be attached to the inner/child span for the RPC, not the outer span the user created. ########## core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +/** + * Factory for creating instances of the AccumuloProtocol. + * <p> + * This protocol includes a custom header to ensure compatibility between different versions of the + * protocol. + */ +public class AccumuloProtocolFactory extends TCompactProtocol.Factory { + + private static final long serialVersionUID = 1L; + + private final boolean isClient; + + public static class AccumuloProtocol extends TCompactProtocol { + + static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII + static final byte PROTOCOL_VERSION = 1; + private static final boolean HEADER_HAS_TRACE = true; + + private final boolean isClient; + + private Span span = null; + private Scope scope = null; + private final Map<String,String> traceHeaders = new HashMap<>(); + + public AccumuloProtocol(TTransport transport, boolean isClient) { + super(transport); + this.isClient = isClient; + } + + /** + * For client calls, add RPC span and write the validation header + */ + @Override + public void writeMessageBegin(TMessage message) throws TException { + if (!this.isClient) { + super.writeMessageBegin(message); + } else { + span = TraceUtil.startClientRpcSpan(this.getClass(), message.name); + scope = span.makeCurrent(); + + try { + this.writeHeader(); + super.writeMessageBegin(message); + } catch (TException e) { + if (span != null) { + TraceUtil.setException(span, e, false); + } + if (scope != null) { + scope.close(); + } + if (span != null) { + span.end(); + } + throw e; + } + } + } + + /** + * Writes the Accumulo protocol header containing version and identification info + */ + private void writeHeader() throws TException { + super.writeI32(MAGIC_NUMBER); + super.writeByte(PROTOCOL_VERSION); + + if (span == null || !span.getSpanContext().isValid()) { + super.writeBool(!HEADER_HAS_TRACE); + return; + } + + super.writeBool(HEADER_HAS_TRACE); + traceHeaders.clear(); + + TraceUtil.injectTraceContext(traceHeaders); + + super.writeI32(traceHeaders.size()); + + for (Map.Entry<String,String> entry : traceHeaders.entrySet()) { + super.writeString(entry.getKey()); + super.writeString(entry.getValue()); + } + } + + @Override + public void writeMessageEnd() throws TException { + try { + super.writeMessageEnd(); + } finally { + if (scope != null) { + scope.close(); + span.end(); + } + } + } + + /** + * For server calls, validate the header + */ + @Override + public TMessage readMessageBegin() throws TException { + if (!this.isClient) { + this.validateHeader(); + } + + return super.readMessageBegin(); + } + + /** + * Reads and validates the Accumulo protocol header + * + * @throws TException if the header is invalid or incompatible + */ + void validateHeader() throws TException { + final int magic = super.readI32(); + if (magic != MAGIC_NUMBER) { + throw new TException("Invalid Accumulo protocol: magic number mismatch. Expected: 0x" + + Integer.toHexString(MAGIC_NUMBER) + ", got: 0x" + Integer.toHexString(magic)); + } + + final byte version = super.readByte(); + validateProtocolVersion(version); + + final boolean hasTrace = super.readBool(); + + if (hasTrace) { + final int numHeaders = super.readI32(); + + final Map<String,String> headers = new HashMap<>(numHeaders); + for (int i = 0; i < numHeaders; i++) { + String key = super.readString(); + String value = super.readString(); + headers.put(key, value); + } + + if (!headers.isEmpty()) { + Context extractedContext = TraceUtil.extractTraceContext(headers); + + // Create server span with extracted context as parent + span = TraceUtil.startServerRpcSpanFromContext(this.getClass(), "handleMessage", Review Comment: `handleMessage` is a bit generic. Perhaps `handleRpcMessage` might be more clear when looking at the traces? ########## core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java: ########## @@ -131,48 +176,6 @@ public static void setException(Span span, Throwable e, boolean rethrown) { } } - /** - * Obtain {@link org.apache.accumulo.core.clientImpl.thrift.TInfo} for the current context. This - * is used to send the current trace information to a remote process - */ - public static TInfo traceInfo() { - TInfo tinfo = new TInfo(); - W3CTraceContextPropagator.getInstance().inject(Context.current(), tinfo, TInfo::putToHeaders); - return tinfo; - } - - /** - * Returns a newly created Context from the TInfo object sent by a remote process. The Context can - * then be used in this process to continue the tracing. The Context is used like: - * - * <pre> - * Context remoteCtx = getContext(tinfo); - * Span span = tracer.spanBuilder(name).setParent(remoteCtx).startSpan() - * </pre> - * - * @param tinfo tracing information serialized over Thrift - */ - private static Context getContext(TInfo tinfo) { - return W3CTraceContextPropagator.getInstance().extract(Context.current(), tinfo, - new TextMapGetter<TInfo>() { - @Override - public Iterable<String> keys(TInfo carrier) { - if (carrier.getHeaders() == null) { - return null; - } - return carrier.getHeaders().keySet(); - } - - @Override - public String get(TInfo carrier, String key) { - if (carrier.getHeaders() == null) { - return null; - } - return carrier.getHeaders().get(key); - } - }); - } - Review Comment: These two old methods were basically serialize/deserialize. We can replace them with new serialize/deserialize methods, even though the type is not TInfo, but some other serialized form (String). ########## core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +/** + * Factory for creating instances of the AccumuloProtocol. + * <p> + * This protocol includes a custom header to ensure compatibility between different versions of the + * protocol. + */ +public class AccumuloProtocolFactory extends TCompactProtocol.Factory { + + private static final long serialVersionUID = 1L; + + private final boolean isClient; + + public static class AccumuloProtocol extends TCompactProtocol { + + static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII + static final byte PROTOCOL_VERSION = 1; + private static final boolean HEADER_HAS_TRACE = true; + + private final boolean isClient; + + private Span span = null; + private Scope scope = null; + private final Map<String,String> traceHeaders = new HashMap<>(); + + public AccumuloProtocol(TTransport transport, boolean isClient) { + super(transport); + this.isClient = isClient; + } + + /** + * For client calls, add RPC span and write the validation header + */ + @Override + public void writeMessageBegin(TMessage message) throws TException { + if (!this.isClient) { + super.writeMessageBegin(message); + } else { + span = TraceUtil.startClientRpcSpan(this.getClass(), message.name); + scope = span.makeCurrent(); + + try { + this.writeHeader(); Review Comment: ```suggestion writeClientHeader(); ``` ########## core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +/** + * Factory for creating instances of the AccumuloProtocol. + * <p> + * This protocol includes a custom header to ensure compatibility between different versions of the + * protocol. + */ +public class AccumuloProtocolFactory extends TCompactProtocol.Factory { + + private static final long serialVersionUID = 1L; + + private final boolean isClient; + + public static class AccumuloProtocol extends TCompactProtocol { + + static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII + static final byte PROTOCOL_VERSION = 1; + private static final boolean HEADER_HAS_TRACE = true; + + private final boolean isClient; + + private Span span = null; + private Scope scope = null; + private final Map<String,String> traceHeaders = new HashMap<>(); + + public AccumuloProtocol(TTransport transport, boolean isClient) { + super(transport); + this.isClient = isClient; + } + + /** + * For client calls, add RPC span and write the validation header + */ + @Override + public void writeMessageBegin(TMessage message) throws TException { + if (!this.isClient) { + super.writeMessageBegin(message); + } else { + span = TraceUtil.startClientRpcSpan(this.getClass(), message.name); + scope = span.makeCurrent(); + + try { + this.writeHeader(); + super.writeMessageBegin(message); + } catch (TException e) { + if (span != null) { + TraceUtil.setException(span, e, false); + } + if (scope != null) { + scope.close(); + } + if (span != null) { + span.end(); + } + throw e; + } + } + } + + /** + * Writes the Accumulo protocol header containing version and identification info + */ + private void writeHeader() throws TException { + super.writeI32(MAGIC_NUMBER); + super.writeByte(PROTOCOL_VERSION); + + if (span == null || !span.getSpanContext().isValid()) { + super.writeBool(!HEADER_HAS_TRACE); + return; + } + + super.writeBool(HEADER_HAS_TRACE); + traceHeaders.clear(); + + TraceUtil.injectTraceContext(traceHeaders); + + super.writeI32(traceHeaders.size()); + + for (Map.Entry<String,String> entry : traceHeaders.entrySet()) { + super.writeString(entry.getKey()); + super.writeString(entry.getValue()); + } + } + + @Override + public void writeMessageEnd() throws TException { + try { + super.writeMessageEnd(); + } finally { + if (scope != null) { + scope.close(); + span.end(); + } + } + } + + /** + * For server calls, validate the header + */ + @Override + public TMessage readMessageBegin() throws TException { + if (!this.isClient) { + this.validateHeader(); + } + + return super.readMessageBegin(); + } + + /** + * Reads and validates the Accumulo protocol header + * + * @throws TException if the header is invalid or incompatible + */ + void validateHeader() throws TException { + final int magic = super.readI32(); + if (magic != MAGIC_NUMBER) { + throw new TException("Invalid Accumulo protocol: magic number mismatch. Expected: 0x" + + Integer.toHexString(MAGIC_NUMBER) + ", got: 0x" + Integer.toHexString(magic)); + } + + final byte version = super.readByte(); + validateProtocolVersion(version); + + final boolean hasTrace = super.readBool(); + + if (hasTrace) { + final int numHeaders = super.readI32(); + + final Map<String,String> headers = new HashMap<>(numHeaders); + for (int i = 0; i < numHeaders; i++) { + String key = super.readString(); + String value = super.readString(); + headers.put(key, value); + } + + if (!headers.isEmpty()) { + Context extractedContext = TraceUtil.extractTraceContext(headers); + + // Create server span with extracted context as parent + span = TraceUtil.startServerRpcSpanFromContext(this.getClass(), "handleMessage", + extractedContext); + scope = span.makeCurrent(); + } + } + } + + /** + * @throws TException if the given protocol version is incompatible with the current version + */ + private void validateProtocolVersion(byte protocolVersion) throws TException { + if (protocolVersion != PROTOCOL_VERSION) { + throw new TException("Incompatible protocol version. Version seen: " + protocolVersion + + ", expected version: " + PROTOCOL_VERSION); Review Comment: This requires us to keep the protocol version updated whenever we change the thrift RPC... however, I think we need to proactively prevent communication with any potentially incompatible version of Accumulo. We generally try to support rolling upgrades for bugfix releases, but for all other upgrades, we expect the whole cluster to be shut down and restarted with the new version. With that in mind, I think we should serialize Constants.VERSION to the protocol, and we should validate by matching on the first two elements of the Accumulo version to ensure it matches the server's own first two elements of its Constants.VERSION. We can still have the protocol version in case we change how things are serialized in the protocol (such as adding another header field prior to the serialized trace context or we want to change the boolean to an int or something)... but we won't need to bump it very often unless we change the protocol itself. We shouldn't need to bump it just because of an Accumulo version release, but we still need to check the Accumulo version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org