Author: mriou
Date: Tue Oct 24 11:00:12 2006
New Revision: 467408

URL: http://svn.apache.org/viewvc?view=rev&rev=467408
Log:
Started using XStream instead of Java serialization to persist the jacob state. 
Not active for now (switching to something else).

Modified:
    
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java

Modified: 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java?view=diff&rev=467408&r1=467407&r2=467408
==============================================================================
--- 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
 (original)
+++ 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
 Tue Oct 24 11:00:12 2006
@@ -36,17 +36,7 @@
 import org.apache.ode.utils.ArrayUtils;
 import org.apache.ode.utils.ObjectPrinter;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.io.ObjectStreamClass;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.io.Serializable;
+import java.io.*;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.HashMap;
@@ -58,6 +48,18 @@
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
+import com.thoughtworks.xstream.core.util.CustomObjectOutputStream;
+import com.thoughtworks.xstream.core.ReferenceByIdMarshallingStrategy;
+import com.thoughtworks.xstream.core.ReferenceByIdMarshaller;
+import com.thoughtworks.xstream.io.StatefulWriter;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.converters.ConverterLookup;
+import com.thoughtworks.xstream.converters.DataHolder;
+import com.thoughtworks.xstream.converters.Converter;
+import com.thoughtworks.xstream.mapper.Mapper;
+
 /**
  * A fast, in-memory [EMAIL PROTECTED] 
org.apache.ode.jacob.soup.ExecutionQueue}
  * implementation.
@@ -117,7 +119,7 @@
     public Map<Object, LinkedList<IndexedObject>> getIndex() {
         return _index;
     }
-    
+
     public void add(CommChannel channel) {
         if (__log.isTraceEnabled())
             __log.trace(ObjectPrinter.stringifyMethodEnter("add", new Object[] 
{ "channel", channel }));
@@ -301,6 +303,7 @@
         flush();
 
         ExecutionQueueOutputStream sos = new ExecutionQueueOutputStream(oos);
+//        XQXMLOutputStream sos = createObjectOutputStream(new 
OutputStreamWriter(oos));
 
         sos.writeInt(_objIdCounter);
         sos.writeInt(_currentCycle);
@@ -783,13 +786,117 @@
                 resolved = obj;
             }
 
-            
             if (resolved != null && resolved instanceof IndexedObject)
                 index((IndexedObject) resolved);
-            
+
             return resolved;
         }
     }
+
+    private XQXMLOutputStream createObjectOutputStream(Writer writer) throws 
IOException {
+        final StatefulWriter statefulWriter = new StatefulWriter(new 
PrettyPrintWriter(writer));
+        statefulWriter.startNode("object-stream", null);
+        final XStream xstream = new XStream();
+        xstream.setMode(XStream.ID_REFERENCES);
+        xstream.setMarshallingStrategy(new XQMarshallingStrategy());
+        return new XQXMLOutputStream(new 
CustomObjectOutputStream.StreamCallback() {
+            public void writeToStream(Object object) {
+                xstream.marshal(object, statefulWriter);
+            }
+
+            public void writeFieldsToStream(Map fields) throws 
NotActiveException {
+                throw new NotActiveException("not in call to writeObject");
+            }
+
+            public void defaultWriteObject() throws NotActiveException {
+                throw new NotActiveException("not in call to writeObject");
+            }
+
+            public void flush() {
+                statefulWriter.flush();
+            }
+
+            public void close() {
+                if (statefulWriter.state() != StatefulWriter.STATE_CLOSED) {
+                    statefulWriter.endNode();
+                    statefulWriter.close();
+                }
+            }
+        });
+
+    }
+
+    private class XQXMLOutputStream extends CustomObjectOutputStream {
+        private Set<Object> _serializedChannels = new HashSet<Object>();
+
+        public XQXMLOutputStream(StreamCallback callback) throws IOException {
+            super(callback);
+            enableReplaceObject(true);
+        }
+
+        public Set<Object> getSerializedChannels() {
+            return _serializedChannels;
+        }
+
+        protected void writeClassDescriptor(ObjectStreamClass desc) throws 
IOException {
+            if (Serializable.class.isAssignableFrom(desc.forClass())) {
+                writeBoolean(true);
+                writeUTF(desc.getName());
+            } else {
+                writeBoolean(false);
+                super.writeClassDescriptor(desc);
+            }
+        }
+
+        protected void writeObjectOverride(Object object) throws IOException {
+            super.writeObjectOverride(replaceObject(object));
+        }
+
+        /**
+         * Use this method to spy on any channels that are being serialized to
+         * this stream.
+         *
+         * @param obj
+         * @return
+         * @throws IOException
+         */
+        protected Object replaceObject(Object obj) throws IOException {
+            if (!Serializable.class.isAssignableFrom(obj.getClass()))
+                return null;
+
+            if (obj instanceof org.apache.ode.jacob.Channel) {
+                CommChannel commChannel = (CommChannel) 
ChannelFactory.getBackend((Channel) obj);
+                _serializedChannels.add(commChannel.getId());
+                return new ChannelRef(commChannel.getType(), (Integer) 
commChannel.getId());
+            } else if (_replacementMap != null && 
_replacementMap.isReplaceable(obj)) {
+                Object replacement = _replacementMap.getReplacement(obj);
+                if (__log.isDebugEnabled())
+                    __log.debug("ReplacmentMap: getReplacement(" + obj + ") = 
" + replacement);
+                return replacement;
+            }
+
+            return obj;
+        }
+    }
+
+    private class XQMarshallingStrategy extends 
ReferenceByIdMarshallingStrategy {
+        public void marshal(HierarchicalStreamWriter writer, Object obj, 
ConverterLookup converterLookup, Mapper mapper, DataHolder dataHolder) {
+            new XQMarshaller(writer, converterLookup, mapper)
+                .start(obj, dataHolder);
+        }
+    }
+
+    private class XQMarshaller extends ReferenceByIdMarshaller {
+        public XQMarshaller(HierarchicalStreamWriter hierarchicalStreamWriter, 
ConverterLookup converterLookup, Mapper mapper) {
+            super(hierarchicalStreamWriter, converterLookup, mapper);
+        }
+
+        public void convert(Object object, Converter converter) {
+            // TODO call replacement logic
+            super.convert(object, converter);
+        }
+    }
+
 
     private static final class ChannelRef implements Externalizable {
         private Class _type;


Reply via email to