Author: mriou
Date: Tue Oct 24 11:00:12 2006
New Revision: 467408
URL: http://svn.apache.org/viewvc?view=rev&rev=467408
Log:
Started using XStream instead of Java serialization to persist the jacob state.
Not active for now (switching to something else).
Modified:
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
Modified:
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java?view=diff&rev=467408&r1=467407&r2=467408
==============================================================================
---
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
(original)
+++
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
Tue Oct 24 11:00:12 2006
@@ -36,17 +36,7 @@
import org.apache.ode.utils.ArrayUtils;
import org.apache.ode.utils.ObjectPrinter;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.io.ObjectStreamClass;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.io.Serializable;
+import java.io.*;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
@@ -58,6 +48,18 @@
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+import com.thoughtworks.xstream.core.util.CustomObjectOutputStream;
+import com.thoughtworks.xstream.core.ReferenceByIdMarshallingStrategy;
+import com.thoughtworks.xstream.core.ReferenceByIdMarshaller;
+import com.thoughtworks.xstream.io.StatefulWriter;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.converters.ConverterLookup;
+import com.thoughtworks.xstream.converters.DataHolder;
+import com.thoughtworks.xstream.converters.Converter;
+import com.thoughtworks.xstream.mapper.Mapper;
+
/**
* A fast, in-memory [EMAIL PROTECTED]
org.apache.ode.jacob.soup.ExecutionQueue}
* implementation.
@@ -117,7 +119,7 @@
public Map<Object, LinkedList<IndexedObject>> getIndex() {
return _index;
}
-
+
public void add(CommChannel channel) {
if (__log.isTraceEnabled())
__log.trace(ObjectPrinter.stringifyMethodEnter("add", new Object[]
{ "channel", channel }));
@@ -301,6 +303,7 @@
flush();
ExecutionQueueOutputStream sos = new ExecutionQueueOutputStream(oos);
+// XQXMLOutputStream sos = createObjectOutputStream(new
OutputStreamWriter(oos));
sos.writeInt(_objIdCounter);
sos.writeInt(_currentCycle);
@@ -783,13 +786,117 @@
resolved = obj;
}
-
if (resolved != null && resolved instanceof IndexedObject)
index((IndexedObject) resolved);
-
+
return resolved;
}
}
+
+ private XQXMLOutputStream createObjectOutputStream(Writer writer) throws
IOException {
+ final StatefulWriter statefulWriter = new StatefulWriter(new
PrettyPrintWriter(writer));
+ statefulWriter.startNode("object-stream", null);
+ final XStream xstream = new XStream();
+ xstream.setMode(XStream.ID_REFERENCES);
+ xstream.setMarshallingStrategy(new XQMarshallingStrategy());
+ return new XQXMLOutputStream(new
CustomObjectOutputStream.StreamCallback() {
+ public void writeToStream(Object object) {
+ xstream.marshal(object, statefulWriter);
+ }
+
+ public void writeFieldsToStream(Map fields) throws
NotActiveException {
+ throw new NotActiveException("not in call to writeObject");
+ }
+
+ public void defaultWriteObject() throws NotActiveException {
+ throw new NotActiveException("not in call to writeObject");
+ }
+
+ public void flush() {
+ statefulWriter.flush();
+ }
+
+ public void close() {
+ if (statefulWriter.state() != StatefulWriter.STATE_CLOSED) {
+ statefulWriter.endNode();
+ statefulWriter.close();
+ }
+ }
+ });
+
+ }
+
+ private class XQXMLOutputStream extends CustomObjectOutputStream {
+ private Set<Object> _serializedChannels = new HashSet<Object>();
+
+ public XQXMLOutputStream(StreamCallback callback) throws IOException {
+ super(callback);
+ enableReplaceObject(true);
+ }
+
+ public Set<Object> getSerializedChannels() {
+ return _serializedChannels;
+ }
+
+ protected void writeClassDescriptor(ObjectStreamClass desc) throws
IOException {
+ if (Serializable.class.isAssignableFrom(desc.forClass())) {
+ writeBoolean(true);
+ writeUTF(desc.getName());
+ } else {
+ writeBoolean(false);
+ super.writeClassDescriptor(desc);
+ }
+ }
+
+ protected void writeObjectOverride(Object object) throws IOException {
+ super.writeObjectOverride(replaceObject(object));
+ }
+
+ /**
+ * Use this method to spy on any channels that are being serialized to
+ * this stream.
+ *
+ * @param obj
+ * @return
+ * @throws IOException
+ */
+ protected Object replaceObject(Object obj) throws IOException {
+ if (!Serializable.class.isAssignableFrom(obj.getClass()))
+ return null;
+
+ if (obj instanceof org.apache.ode.jacob.Channel) {
+ CommChannel commChannel = (CommChannel)
ChannelFactory.getBackend((Channel) obj);
+ _serializedChannels.add(commChannel.getId());
+ return new ChannelRef(commChannel.getType(), (Integer)
commChannel.getId());
+ } else if (_replacementMap != null &&
_replacementMap.isReplaceable(obj)) {
+ Object replacement = _replacementMap.getReplacement(obj);
+ if (__log.isDebugEnabled())
+ __log.debug("ReplacmentMap: getReplacement(" + obj + ") =
" + replacement);
+ return replacement;
+ }
+
+ return obj;
+ }
+ }
+
+ private class XQMarshallingStrategy extends
ReferenceByIdMarshallingStrategy {
+ public void marshal(HierarchicalStreamWriter writer, Object obj,
ConverterLookup converterLookup, Mapper mapper, DataHolder dataHolder) {
+ new XQMarshaller(writer, converterLookup, mapper)
+ .start(obj, dataHolder);
+ }
+ }
+
+ private class XQMarshaller extends ReferenceByIdMarshaller {
+ public XQMarshaller(HierarchicalStreamWriter hierarchicalStreamWriter,
ConverterLookup converterLookup, Mapper mapper) {
+ super(hierarchicalStreamWriter, converterLookup, mapper);
+ }
+
+ public void convert(Object object, Converter converter) {
+ // TODO call replacement logic
+ super.convert(object, converter);
+ }
+ }
+
private static final class ChannelRef implements Externalizable {
private Class _type;