PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/033a2fc2 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/033a2fc2 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/033a2fc2 Branch: refs/heads/master Commit: 033a2fc2a91052a6db94da55d87c173d4dbdabab Parents: 764eb8f Author: Sergey Soldatov <s...@apache.org> Authored: Mon Sep 25 19:57:49 2017 -0700 Committer: Sergey Soldatov <s...@apache.org> Committed: Wed Sep 27 12:09:25 2017 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/HashJoinCacheIT.java | 32 +++++++++++++- .../apache/phoenix/cache/ServerCacheClient.java | 46 ++++++++++++++++---- 2 files changed, 68 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/033a2fc2/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java index 76f45e2..cebb9ad 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java @@ -19,9 +19,13 @@ package org.apache.phoenix.end2end; import java.io.IOException; import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Properties; import java.util.Random; import org.apache.hadoop.hbase.client.Scan; @@ -31,9 +35,12 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; +import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.join.HashJoinInfo; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Test; @@ -43,6 +50,9 @@ import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Lists; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.fail; + @RunWith(Parameterized.class) public class HashJoinCacheIT extends HashJoinIT { @@ -426,7 +436,27 @@ public class HashJoinCacheIT extends HashJoinIT { public void testUpsertWithJoin() throws Exception { // TODO: We will enable this test once PHOENIX-3163 } - + + @Test + public void testExpiredCache() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, "1"); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME); + String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); + String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + + tableName1 + " supp RIGHT JOIN " + tableName2 + + " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\""; + try { + PreparedStatement statement = conn.prepareStatement(query); + ResultSet rs = statement.executeQuery(); + rs.next(); + fail("HashJoinCacheNotFoundException was not thrown or incorrectly handled"); + } catch (HashJoinCacheNotFoundException e) { + //Expected exception + } + } + public static class InvalidateHashCache extends SimpleRegionObserver { public static Random rand= new Random(); public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/033a2fc2/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index ce46a3e..28a42fa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -121,16 +122,24 @@ public class ServerCacheClient { public class ServerCache implements SQLCloseable { private final int size; private final byte[] id; - private final Set<HRegionLocation> servers; + private final Map<HRegionLocation, Long> servers; private ImmutableBytesWritable cachePtr; private MemoryChunk chunk; private File outputFile; + private long maxServerCacheTTL; public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr, ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException { + maxServerCacheTTL = services.getProps().getInt( + QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); this.id = id; - this.servers = new HashSet<HRegionLocation>(servers); + this.servers = new HashMap(); + long currentTime = System.currentTimeMillis(); + for(HRegionLocation loc : servers) { + this.servers.put(loc, currentTime); + } this.size = cachePtr.getLength(); if (storeCacheOnClient) { try { @@ -171,10 +180,28 @@ public class ServerCacheClient { public byte[] getId() { return id; } - - public boolean addServer(HRegionLocation loc) { - return this.servers.add(loc); - } + + public boolean addServer(HRegionLocation loc) { + if(this.servers.containsKey(loc)) { + return false; + } else { + this.servers.put(loc, System.currentTimeMillis()); + return true; + } + } + + public boolean isExpired(HRegionLocation loc) { + if(this.servers.containsKey(loc)) { + Long time = this.servers.get(loc); + if(System.currentTimeMillis() - time > maxServerCacheTTL) + return true; // cache was send more than maxTTL ms ago, expecting that it's expired + } else { + return false; // should be on server yet. + } + return false; // Unknown region location. Need to send the cache. + } + + /** * Call to free up cache on region servers when no longer needed @@ -182,7 +209,7 @@ public class ServerCacheClient { @Override public void close() throws SQLException { try{ - removeServerCache(this, servers); + removeServerCache(this, servers.keySet()); }finally{ cachePtr = null; if (chunk != null) { @@ -305,8 +332,6 @@ public class ServerCacheClient { /** * Remove the cached table from all region servers - * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)}) - * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)}) * @throws SQLException * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added */ @@ -421,6 +446,9 @@ public class ServerCacheClient { byte[] tableName = pTable.getPhysicalName().getBytes(); table = services.getTable(tableName); HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion); + if(cache.isExpired(tableRegionLocation)) { + return false; + } if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) { success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory, txState);