This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch feature/GEODE-4685 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-4685 by this push: new 8906ee2 GEODE-4685: cleaned up the serialization of the StreamingOperation to handle the case where the cache does not exist 8906ee2 is described below commit 8906ee2c97dad8b3fcf973e18af71fcf35ca3645 Author: Udo <ukohlme...@pivotal.io> AuthorDate: Fri Mar 2 10:09:43 2018 -0800 GEODE-4685: cleaned up the serialization of the StreamingOperation to handle the case where the cache does not exist --- .../internal/streaming/StreamingOperation.java | 91 +++++++++++++--------- .../PRQueryRemoteNodeExceptionDUnitTest.java | 2 +- 2 files changed, 56 insertions(+), 37 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java index 1ad5ec0..cb2cd45 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.geode.cache.CacheClosedException; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -66,7 +67,6 @@ import org.apache.geode.pdx.internal.TypeRegistry; * StreamingOperation is an abstraction for sending messages to multiple (or single) recipient * requesting a potentially large amount of data and receiving the reply with data chunked into * several messages. - * */ public abstract class StreamingOperation { private static final Logger logger = LogService.getLogger(); @@ -79,22 +79,25 @@ public abstract class StreamingOperation { public final InternalDistributedSystem sys; - /** Creates a new instance of StreamingOperation */ + /** + * Creates a new instance of StreamingOperation + */ public StreamingOperation(InternalDistributedSystem sys) { this.sys = sys; } /** * Returns normally if succeeded to get data, otherwise throws an exception - * * @throws InterruptedException TODO-javadocs */ public void getDataFromAll(Set recipients) throws org.apache.geode.cache.TimeoutException, InterruptedException { - if (Thread.interrupted()) + if (Thread.interrupted()) { throw new InterruptedException(); - if (recipients.isEmpty()) + } + if (recipients.isEmpty()) { return; + } StreamingProcessor processor = new StreamingProcessor(this.sys, recipients); DistributionMessage m = createRequestMessage(recipients, processor); @@ -115,30 +118,30 @@ public abstract class StreamingOperation { } } - /** Override in subclass to instantiate request message */ + /** + * Override in subclass to instantiate request message + */ protected abstract DistributionMessage createRequestMessage(Set recipients, - ReplyProcessor21 processor); + ReplyProcessor21 processor); /** * Called from separate thread when reply is processed. - * * @return false if should abort (region was destroyed or cache was closed) */ public boolean processChunk(List objects, InternalDistributedMember sender, int sequenceNum, - boolean lastInSequence) { + boolean lastInSequence) { return processData(objects, sender, sequenceNum, lastInSequence); } /** * Override in subclass to do something useful with the data. - * * @param sequenceNum the sequence of this data (0-based), in case ordering matters * @param lastInSequence true if this is the last chunk in the sequence * @return false to abort */ protected abstract boolean processData(List objects, InternalDistributedMember sender, - int sequenceNum, boolean lastInSequence); + int sequenceNum, boolean lastInSequence); public class StreamingProcessor extends ReplyProcessor21 { protected volatile boolean abort = false; @@ -150,7 +153,9 @@ public abstract class StreamingOperation { int msgsProcessed = 0; int numMsgs = 0; - /** Return true if this is the very last reply msg to process for this member */ + /** + * Return true if this is the very last reply msg to process for this member + */ protected synchronized boolean trackMessage(StreamingReplyMessage m) { this.msgsProcessed++; @@ -174,7 +179,7 @@ public abstract class StreamingOperation { public StreamingProcessor(final InternalDistributedSystem system, - InternalDistributedMember member) { + InternalDistributedMember member) { super(system, member); } @@ -213,12 +218,12 @@ public abstract class StreamingOperation { } if (isLast) { super.process(msg, false); // removes from members and cause us to - // ignore future messages received from that member + // ignore future messages received from that member } } finally { this.msgsBeingProcessed.decrementAndGet(); checkIfDone(); // check to see if decrementing msgsBeingProcessed requires signalling to - // proceed + // proceed } } @@ -316,7 +321,7 @@ public abstract class StreamingOperation { // for the next objects, disallow stream from allocating more storage do { outStream.disallowExpansion(CHUNK_FULL); // sets the mark where rollback occurs on - // CHUNK_FULL + // CHUNK_FULL nextObject = getNextReplyObject(); @@ -345,8 +350,8 @@ public abstract class StreamingOperation { break; // receiver no longer cares } outStream.reset(); // ready for reuse, assumes replyWithData - // does not queue the message but outStream has - // already been used + // does not queue the message but outStream has + // already been used } while (nextObject != Token.END_OF_STREAM); // } catch (CancelException e) { // // if cache is closed, we cannot send a reply (correct?) @@ -385,7 +390,7 @@ public abstract class StreamingOperation { // } protected void replyWithData(ClusterDistributionManager dm, HeapDataOutputStream outStream, - int numObjects, int msgNum, boolean lastMsg) { + int numObjects, int msgNum, boolean lastMsg) { StreamingReplyMessage.send(getSender(), this.processorId, null, dm, outStream, numObjects, msgNum, lastMsg); } @@ -421,39 +426,45 @@ public abstract class StreamingOperation { public static class StreamingReplyMessage extends ReplyMessage { - /** the number of this message */ + /** + * the number of this message + */ protected int msgNum; - /** whether this message is the last one in this series */ + /** + * whether this message is the last one in this series + */ protected boolean lastMsg; private transient HeapDataOutputStream chunkStream; // used only on sending side, null means - // abort + // abort private transient int numObjects; // used only on sending side private transient List objectList = null; // used only on receiving side private boolean pdxReadSerialized = false; // used to read PDX types in serialized form. private transient boolean isCanceled = false; // used only on receiving side and if - // messageProcessor is of type - // PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse + // messageProcessor is of type + // PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse /** * @param chunkStream the data to send back, if null then all the following parameters are - * ignored and any future replies from this member will be ignored, and the streaming of - * chunks is considered aborted by the receiver. - * + * ignored and any future replies from this member will be ignored, and the streaming of + * chunks is considered aborted by the receiver. * @param msgNum message number in this series (0-based) * @param lastMsg if this is the last message in this series */ public static void send(InternalDistributedMember recipient, int processorId, - ReplyException exception, DistributionManager dm, HeapDataOutputStream chunkStream, - int numObjects, int msgNum, boolean lastMsg) { + ReplyException exception, DistributionManager dm, + HeapDataOutputStream chunkStream, + int numObjects, int msgNum, boolean lastMsg) { send(recipient, processorId, exception, dm, chunkStream, numObjects, msgNum, lastMsg, false); } public static void send(InternalDistributedMember recipient, int processorId, - ReplyException exception, DistributionManager dm, HeapDataOutputStream chunkStream, - int numObjects, int msgNum, boolean lastMsg, boolean pdxReadSerialized) { + ReplyException exception, DistributionManager dm, + HeapDataOutputStream chunkStream, + int numObjects, int msgNum, boolean lastMsg, + boolean pdxReadSerialized) { StreamingReplyMessage replyMessage = new StreamingReplyMessage(); replyMessage.processorId = processorId; @@ -483,7 +494,9 @@ public abstract class StreamingOperation { return isCanceled; } - /** Return the objects in this chunk as a List, used only on receiving side */ + /** + * Return the objects in this chunk as a List, used only on receiving side + */ public List getObjects() { return this.objectList; } @@ -504,8 +517,14 @@ public abstract class StreamingOperation { this.pdxReadSerialized = in.readBoolean(); Version senderVersion = InternalDataSerializer.getVersionForDataStream(in); boolean isSenderAbove_8_1 = senderVersion.compareTo(Version.GFE_81) > 0; - InternalCache cache = (InternalCache) CacheFactory.getAnyInstance(); - Boolean initialPdxReadSerialized = cache.getPdxRegistry().getPdxReadSerializedOverride(); + InternalCache cache = null; + Boolean initialPdxReadSerialized = false; + try { + cache = (InternalCache) CacheFactory.getAnyInstance(); + initialPdxReadSerialized = cache.getPdxRegistry().getPdxReadSerializedOverride(); + } catch (CacheClosedException e) { + + } if (n == -1) { this.objectList = null; } else { @@ -513,7 +532,7 @@ public abstract class StreamingOperation { this.objectList = new ArrayList(n); // Check if the PDX types needs to be kept in serialized form. // This will make readObject() to return PdxInstance form. - if (this.pdxReadSerialized) { + if (this.pdxReadSerialized && cache != null) { cache.getPdxRegistry().setPdxReadSerializedOverride(true); } try { @@ -559,7 +578,7 @@ public abstract class StreamingOperation { } } } finally { - if (this.pdxReadSerialized) { + if (this.pdxReadSerialized && cache != null) { cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized); } } diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java index bdfbddc..48cfcd3 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java @@ -78,7 +78,7 @@ public class PRQueryRemoteNodeExceptionDUnitTest extends CacheTestCase { public void tearDown() throws Exception { disconnectAllFromDS(); invokeInEveryVM(() -> PRQueryDUnitHelper.setCache(null)); - invokeInEveryVM(() -> QueryObserverHolder.reset()); + invokeInEveryVM(QueryObserverHolder::reset); } @Override -- To stop receiving notification emails like this one, please contact u...@apache.org.