GEODE-1272 Don't deserialize PDX objects when creating an index Setting the flag to prevent deserialization of PDX objects while populating an index that is defined on a partitioned region. We were setting this flag in the member that initially created the index, but not in other members that receive the IndexCreationMessage.
This closes #318 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/341a359e Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/341a359e Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/341a359e Branch: refs/heads/develop Commit: 341a359e00a9ba2f9c74e1dc4fee38fc53f78b3c Parents: 8312196 Author: Dan Smith <upthewatersp...@apache.org> Authored: Wed Dec 14 17:08:36 2016 -0800 Committer: Dan Smith <upthewatersp...@apache.org> Committed: Mon Dec 19 15:32:03 2016 -0800 ---------------------------------------------------------------------- .../query/internal/index/IndexManager.java | 3 + .../cache/PartitionedRegionQueryDUnitTest.java | 177 +++++++++++++++++-- 2 files changed, 163 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/341a359e/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java index 27f239e..6cf9a3f 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java @@ -891,6 +891,8 @@ public class IndexManager { } boolean throwException = false; HashMap<String, Exception> exceptionsMap = new HashMap<String, Exception>(); + boolean oldReadSerialized = DefaultQuery.getPdxReadSerialized(); + DefaultQuery.setPdxReadSerialized(true); try { Iterator entryIter = ((LocalRegion) region).getBestIterator(true); while (entryIter.hasNext()) { @@ -931,6 +933,7 @@ public class IndexManager { throw new MultiIndexCreationException(exceptionsMap); } } finally { + DefaultQuery.setPdxReadSerialized(oldReadSerialized); notifyAfterUpdate(); } } http://git-wip-us.apache.org/repos/asf/geode/blob/341a359e/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryDUnitTest.java index a14e9ee..eb918bc 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryDUnitTest.java @@ -14,10 +14,11 @@ */ package org.apache.geode.internal.cache; -import java.io.IOException; - import org.apache.geode.DataSerializable; import org.apache.geode.cache.query.Struct; +import org.apache.geode.pdx.PdxReader; +import org.apache.geode.pdx.PdxSerializable; +import org.apache.geode.pdx.PdxWriter; import org.apache.geode.test.dunit.DUnitEnv; import org.apache.geode.test.dunit.SerializableRunnableIF; import org.junit.experimental.categories.Category; @@ -26,20 +27,22 @@ import org.junit.Test; import static org.junit.Assert.*; import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; -import java.sql.Date; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.IntStream; import org.apache.geode.cache.Cache; @@ -66,7 +69,6 @@ import org.apache.geode.cache.query.TypeMismatchException; import org.apache.geode.cache.query.internal.DefaultQuery; import org.apache.geode.cache.query.internal.index.IndexManager; import org.apache.geode.cache.query.internal.index.PartitionedIndex; -import org.apache.geode.cache30.CacheTestCase; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.DistributionMessageObserver; @@ -89,9 +91,6 @@ import org.apache.geode.test.dunit.VM; @Category(DistributedTest.class) public class PartitionedRegionQueryDUnitTest extends JUnit4CacheTestCase { - /** - * @param name - */ public PartitionedRegionQueryDUnitTest() { super(); // TODO Auto-generated constructor stub @@ -160,10 +159,99 @@ public class PartitionedRegionQueryDUnitTest extends JUnit4CacheTestCase { } @Test + public void testHashIndexDoesNotDeserializePdxObjects() { + SerializableRunnableIF createIndex = () -> { + Cache cache = getCache(); + cache.getQueryService().createHashIndex("ContractDocumentIndex", "document", "/region"); + }; + String queryString = "select assetId,document from /region where document='B' limit 1000"; + + PdxAssetFactory value = i -> new PdxNotDeserializableAsset(i, Integer.toString(i)); + createIndexDoesNotDerializePdxObjects(createIndex, queryString, value); + } + + @Test + public void tesRangeIndexDoesNotDeserializePdxObjects() { + SerializableRunnableIF createIndex = () -> { + Cache cache = getCache(); + cache.getQueryService().createIndex("ContractDocumentIndex", "ref", + "/region r, r.references ref"); + }; + String queryString = + "select r.assetId,r.document from /region r, r.references ref where ref='B_2' limit 1000"; + PdxAssetFactory value = i -> new PdxNotDeserializableAsset(i, Integer.toString(i)); + createIndexDoesNotDerializePdxObjects(createIndex, queryString, value); + } + + @Test + public void tesRangeIndexWithPdxObjects() { + SerializableRunnableIF createIndex = () -> { + Cache cache = getCache(); + cache.getQueryService().createIndex("ContractDocumentIndex", "ref", + "/region r, r.references ref"); + }; + String queryString = "select r from /region r, r.references ref where ref='B_2' limit 1000"; + + PdxAssetFactory value = i -> new PdxAsset(i, Integer.toString(i)); + createIndexDoesNotDerializePdxObjects(createIndex, queryString, value); + } + + private void createIndexDoesNotDerializePdxObjects(final SerializableRunnableIF createIndex, + final String queryString, PdxAssetFactory valueSupplier) { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + + SerializableRunnableIF createPR = () -> { + Cache cache = getCache(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setTotalNumBuckets(10); + cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(paf.create()) + .create("region"); + }; + vm0.invoke(createPR); + vm1.invoke(createPR); + + // Do Puts. These objects can't be deserialized because they throw + // and exception from the constructor + vm0.invoke(() -> { + Cache cache = getCache(); + Region region = cache.getRegion("region"); + region.put(0, new PdxNotDeserializableAsset(0, "B")); + region.put(10, new PdxNotDeserializableAsset(1, "B")); + region.put(1, new PdxNotDeserializableAsset(1, "B")); + IntStream.range(11, 100).forEach(i -> region.put(i, valueSupplier.getAsset(i))); + }); + + // If this tries to deserialize the assets, it will fail + vm0.invoke(createIndex); + + vm0.invoke(() -> { + QueryService qs = getCache().getQueryService(); + SelectResults<Struct> results = (SelectResults) qs.newQuery(queryString).execute(); + + assertEquals(3, results.size()); + final Index index = qs.getIndex(getCache().getRegion("region"), "ContractDocumentIndex"); + assertEquals(1, index.getStatistics().getTotalUses()); + }); + } + + @Test + public void testFailureToCreateIndexOnLocalNodeThrowsException() { + VM vmToFailCreationOn = Host.getHost(0).getVM(0); + failToCreateIndexOnNode(vmToFailCreationOn); + } + + @Test public void testFailureToCreateIndexOnRemoteNodeThrowsException() { + VM vmToFailCreationOn = Host.getHost(0).getVM(1); + failToCreateIndexOnNode(vmToFailCreationOn); + } + + private void failToCreateIndexOnNode(final VM vmToFailCreationOn) { Host host = Host.getHost(0); VM vm0 = host.getVM(0); - VM vm1 = host.getVM(-1); + VM vm1 = host.getVM(1); SerializableRunnableIF createPR = () -> { Cache cache = getCache(); @@ -178,7 +266,8 @@ public class PartitionedRegionQueryDUnitTest extends JUnit4CacheTestCase { vm0.invoke(() -> { Cache cache = getCache(); Region region = cache.getRegion("region"); - IntStream.range(1, 10).forEach(i -> region.put(i, new NotDeserializableAsset())); + IntStream.range(1, 10) + .forEach(i -> region.put(i, new NotDeserializableAsset(vmToFailCreationOn.getPid()))); }); vm0.invoke(() -> { @@ -196,7 +285,6 @@ public class PartitionedRegionQueryDUnitTest extends JUnit4CacheTestCase { final Index index = cache.getQueryService().getIndex(region, "ContractDocumentIndex"); assertEquals(null, index); }); - } /** @@ -1141,27 +1229,82 @@ public class PartitionedRegionQueryDUnitTest extends JUnit4CacheTestCase { } + public interface PdxAssetFactory extends Serializable { + PdxAsset getAsset(int i); + } + + public static class PdxNotDeserializableAsset extends PdxAsset { + public int assetId; + public String document; + public Collection<String> references = new ArrayList<String>(); + + public PdxNotDeserializableAsset() { + throw new RuntimeException("Preventing Deserialization of Asset"); + } + + public PdxNotDeserializableAsset(final int assetId, final String document) { + super(assetId, document); + } + + @Override + public void fromData(final PdxReader reader) { + throw new RuntimeException("Not allowing us to deserialize one of these"); + } + } + + public static class PdxAsset implements PdxSerializable { + public int assetId; + public String document; + public Collection<String> references = new ArrayList<String>(); + + public PdxAsset() { + + } + + public PdxAsset(final int assetId, final String document) { + this.assetId = assetId; + this.document = document; + references.add(document + "_1"); + references.add(document + "_2"); + references.add(document + "_3"); + } + + @Override + public void toData(final PdxWriter writer) { + writer.writeString("document", document); + writer.writeInt("assetId", assetId); + writer.writeObject("references", references); + } + + @Override + public void fromData(final PdxReader reader) { + this.document = reader.readString("document"); + this.assetId = reader.readInt("assetId"); + this.references = (Collection<String>) reader.readObject("references"); + } + } + public static class NotDeserializableAsset implements DataSerializable { - private int allowedPid; + private int disallowedPid; public NotDeserializableAsset() { } - public NotDeserializableAsset(final int allowedPid) { - this.allowedPid = allowedPid; + public NotDeserializableAsset(final int disallowedPid) { + this.disallowedPid = disallowedPid; } @Override public void toData(final DataOutput out) throws IOException { - out.writeInt(allowedPid); + out.writeInt(disallowedPid); } @Override public void fromData(final DataInput in) throws IOException, ClassNotFoundException { - allowedPid = in.readInt(); - if (allowedPid != DUnitEnv.get().getPid()) { + disallowedPid = in.readInt(); + if (disallowedPid == DUnitEnv.get().getPid()) { throw new IOException("Cannot deserialize"); } }