Oops. That patch wasn't quite right. Here's a better one. The
Responder method to override would be getLocal(Protocol remote).
Index: lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
===================================================================
--- lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
(revision
1349491)
+++ lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java (working
copy)
@@ -65,14 +65,11 @@
= new ConcurrentHashMap<MD5,Protocol>();
private final Protocol local;
- private final MD5 localHash;
protected final List<RPCPlugin> rpcMetaPlugins;
protected Responder(Protocol local) {
this.local = local;
- this.localHash = new MD5();
- localHash.bytes(local.getMD5());
- protocols.put(localHash, local);
+ protocols.put(new MD5(local.getMD5()), local);
this.rpcMetaPlugins =
new CopyOnWriteArrayList<RPCPlugin>();
}
@@ -84,6 +81,9 @@
/** Return the local protocol. */
public Protocol getLocal() { return local; }
+ /** Determine the local protocol from the remote. */
+ protected Protocol getLocal(Protocol remote) { return local; }
+
/**
* Adds a new plugin to manipulate per-call metadata. Plugins
* are executed in the order that they are added.
@@ -126,10 +126,10 @@
Message rm = remote.getMessages().get(messageName);
if (rm == null)
throw new AvroRuntimeException("No such remote message: "+messageName);
- Message m = getLocal().getMessages().get(messageName);
+ Message m = getLocal(remote).getMessages().get(messageName);
if (m == null)
throw new AvroRuntimeException("No message named "+messageName
- +" in "+getLocal());
+ +" in "+getLocal(remote));
Object request = readRequest(rm.getRequest(), m.getRequest(), in);
@@ -211,6 +211,9 @@
remote = Protocol.parse(request.clientProtocol.toString());
protocols.put(request.clientHash, remote);
}
+
+ Protocol local = getLocal(remote);
+ MD5 localHash = new MD5(local.getMD5());
HandshakeResponse response = new HandshakeResponse();
if (localHash.equals(request.serverHash)) {
response.match =
Doug
On Thu, Jun 14, 2012 at 10:10 AM, Doug Cutting <[email protected]> wrote:
> On Tue, Jun 12, 2012 at 6:09 PM, Christophe Taton <[email protected]> wrote:
>> In practice, I have a bunch of independent records, each of them carrying at
>> most one "extension field".
>>
>> I was especially hoping there would be a way to avoid serializing an
>> "extension" record twice (once from the record object into a bytes field,
>> and then a second time as a bytes field into the destination output
>> stream). Ideally, such an extension field should not require its content to
>> be bytes, but should accept any record object, so that it is encoded only
>> once.
>> As I understand it, Avro does not allow me to do this right now. Is this
>> correct?
>
> I think that can be done too if the schema for the extension field is
> known when the client opens a connection. This is a bit like
> org.apache.avro.mapred.Pair<K,V>, where in different files K and V can
> have different schemas. You'd construct a GenericRequestor passing a
> protocol that incorporates the particular extensions in use for that
> session. The server would then subclass GenericResponder overriding
> getLocal() to return the value of getRemote(), so that the remote
> protocol that contains the extensions is used to both read and write
> data. (You could also make this work with specific or reflect.) This
> way a different protocol would be used for each client session. The
> server's implementation of Responder#respond() would have to be
> implemented to handle these variations.
>
> The patch below would be required to make sure that Responder always
> uses the value of getLocal() so that you can meaningfully override it.
> If this sounds useful we can file a Jira.
>
> Doug
>
> Index: lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
> ===================================================================
> --- lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
> (revision
> 1349491)
> +++ lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
> (working
> copy)
> @@ -65,14 +65,11 @@
> = new ConcurrentHashMap<MD5,Protocol>();
>
> private final Protocol local;
> - private final MD5 localHash;
> protected final List<RPCPlugin> rpcMetaPlugins;
>
> protected Responder(Protocol local) {
> this.local = local;
> - this.localHash = new MD5();
> - localHash.bytes(local.getMD5());
> - protocols.put(localHash, local);
> + protocols.put(new MD5(local.getMD5()), local);
> this.rpcMetaPlugins =
> new CopyOnWriteArrayList<RPCPlugin>();
> }
> @@ -211,6 +208,11 @@
> remote = Protocol.parse(request.clientProtocol.toString());
> protocols.put(request.clientHash, remote);
> }
> +
> + if (connection != null && response.match != HandshakeMatch.NONE)
> + connection.setRemote(remote);
> +
> + MD5 localHash = new MD5(getLocal().getMD5());
> HandshakeResponse response = new HandshakeResponse();
> if (localHash.equals(request.serverHash)) {
> response.match =
> @@ -220,7 +222,7 @@
> remote == null ? HandshakeMatch.NONE : HandshakeMatch.CLIENT;
> }
> if (response.match != HandshakeMatch.BOTH) {
> - response.serverProtocol = local.toString();
> + response.serverProtocol = getLocal().toString();
> response.serverHash = localHash;
> }
>
> @@ -232,9 +234,6 @@
> }
> handshakeWriter.write(response, out);
>
> - if (connection != null && response.match != HandshakeMatch.NONE)
> - connection.setRemote(remote);
> -
> return remote;
> }