PHOENIX-2940 Query the stats table and cache stats in the client

Side-steps the issue of how to safely compute statistics and tie
them to a PTable being constructed inside the regionserver. Stats
can be asynchronously updated on the client side, or synchronously
fetched if necessary without blocking other Phoenix clients.

Includes some unit tests and logging on stats-cache maintenance
to help us track usage going forward.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/81776445
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/81776445
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/81776445

Branch: refs/heads/4.x-HBase-1.1
Commit: 817764456b0b9fb372214dd2b8559c816d018999
Parents: 56ba373
Author: Josh Elser <els...@apache.org>
Authored: Mon Jun 20 13:28:27 2016 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Mon Jun 20 14:08:30 2016 -0400

----------------------------------------------------------------------
 .../apache/phoenix/rpc/PhoenixServerRpcIT.java  |  21 ---
 .../coprocessor/MetaDataEndpointImpl.java       |  25 +--
 .../org/apache/phoenix/execute/ScanPlan.java    |   5 +-
 .../phoenix/iterate/BaseResultIterators.java    |   7 +-
 .../phoenix/query/ConnectionQueryServices.java  |   7 +
 .../query/ConnectionQueryServicesImpl.java      |  62 ++----
 .../query/ConnectionlessQueryServicesImpl.java  |  26 ++-
 .../query/DelegateConnectionQueryServices.java  |   6 +
 .../org/apache/phoenix/query/QueryServices.java |   2 +
 .../phoenix/query/QueryServicesOptions.java     |   2 +
 .../apache/phoenix/query/TableStatsCache.java   | 187 +++++++++++++++++++
 .../apache/phoenix/schema/DelegateTable.java    |   6 -
 .../apache/phoenix/schema/MetaDataClient.java   |  25 +--
 .../java/org/apache/phoenix/schema/PTable.java  |   2 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |  76 ++------
 .../phoenix/filter/SkipScanBigFilterTest.java   |  12 +-
 .../PhoenixStatsCacheRemovalListenerTest.java   |  45 +++++
 .../java/org/apache/phoenix/util/TestUtil.java  |   4 +-
 phoenix-protocol/src/main/PTable.proto          |   4 +-
 19 files changed, 342 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index 4d3620f..75eebc2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -209,24 +208,4 @@ public class PhoenixServerRpcIT extends 
BaseOwnClusterHBaseManagedTimeIT {
                // verify index and data tables are on different servers
                assertNotEquals("Tables " + tableName1 + " and " + tableName2 + 
" should be on different region servers", serverName1, serverName2);
        }
-    
-    @Test
-    public void testMetadataQos() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = driver.connect(getUrl(), props);
-        try {
-               
ensureTablesOnDifferentRegionServers(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
 PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
-            // create the table 
-            conn.createStatement().execute(
-                    "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT 
NULL PRIMARY KEY, v VARCHAR)");
-            // query the table from another connection, so that SYSTEM.STATS 
will be used 
-            conn.createStatement().execute("SELECT * FROM 
"+DATA_TABLE_FULL_NAME);
-            // verify that that metadata queue is at least once 
-            
Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getMetadataRpcExecutor(), 
Mockito.atLeastOnce()).dispatch(Mockito.any(CallRunner.class));
-        }
-        finally {
-            conn.close();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index e77ff8b..93bb8e5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -198,8 +198,6 @@ import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.stats.PTableStats;
-import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PBoolean;
@@ -951,30 +949,13 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
               addColumnToTable(results, colName, famName, colKeyValues, 
columns, saltBucketNum != null);
           }
         }
-        PName physicalTableName = physicalTables.isEmpty() ? 
PNameFactory.newName(SchemaUtil.getPhysicalTableName(
-                Bytes.toBytes(SchemaUtil.getTableName(schemaName.getBytes(), 
tableName.getBytes())), isNamespaceMapped)
-                .getNameAsString()) : physicalTables.get(0);
-        PTableStats stats = PTableStats.EMPTY_STATS;
-        if (tenantId == null) {
-            HTableInterface statsHTable = null;
-            try {
-                statsHTable = ServerUtil.getHTableForCoprocessorScan(env,
-                        
SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,
 env.getConfiguration())
-                                .getName());
-                stats = StatisticsUtil.readStatistics(statsHTable, 
physicalTableName.getBytes(), clientTimeStamp);
-                timeStamp = Math.max(timeStamp, stats.getTimestamp());
-            } catch (org.apache.hadoop.hbase.TableNotFoundException e) {
-                
logger.warn(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,
-                        env.getConfiguration()) + " not online yet?");
-            } finally {
-                if (statsHTable != null) statsHTable.close();
-            }
-        }
+        // Avoid querying the stats table because we're holding the rowLock 
here. Issuing an RPC to a remote
+        // server while holding this lock is a bad idea and likely to cause 
contention.
         return PTableImpl.makePTable(tenantId, schemaName, tableName, 
tableType, indexState, timeStamp, tableSeqNum,
                 pkName, saltBucketNum, columns, tableType == INDEX ? 
schemaName : null,
                 tableType == INDEX ? dataTableName : null, indexes, 
isImmutableRows, physicalTables, defaultFamilyName,
                 viewStatement, disableWAL, multiTenant, storeNulls, viewType, 
viewIndexId, indexType,
-                rowKeyOrderOptimizable, transactional, updateCacheFrequency, 
stats, baseColumnCount,
+                rowKeyOrderOptimizable, transactional, updateCacheFrequency, 
baseColumnCount,
                 indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, 
isAppendOnlySchema);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index cdd0703..c55a1cc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -61,6 +61,7 @@ import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -123,7 +124,9 @@ public class ScanPlan extends BaseQueryPlan {
         if (perScanLimit == null || scan.getFilter() != null) {
             return false;
         }
-        GuidePostsInfo gpsInfo = 
table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
+        long scn = context.getConnection().getSCN() == null ? Long.MAX_VALUE : 
context.getConnection().getSCN();
+        PTableStats tableStats = 
context.getConnection().getQueryServices().getTableStats(table.getName().getBytes(),
 scn);
+        GuidePostsInfo gpsInfo = 
tableStats.getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
         long estRowSize = SchemaUtil.estimateRowSize(table);
         long estRegionSize;
         if (gpsInfo == null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 57458ce..42fe0d9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -46,6 +46,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
@@ -357,7 +358,11 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
         physicalTableName = table.getPhysicalName().getBytes();
-        tableStats = useStats() ? new 
MetaDataClient(context.getConnection()).getTableStats(table) : 
PTableStats.EMPTY_STATS;
+        Long currentSCN = context.getConnection().getSCN();
+        if (null == currentSCN) {
+          currentSCN = HConstants.LATEST_TIMESTAMP;
+        }
+        tableStats = useStats() ? 
context.getConnection().getQueryServices().getTableStats(physicalTableName, 
currentSCN) : PTableStats.EMPTY_STATS;
         // Used to tie all the scans together during logging
         scanId = UUID.randomUUID().toString();
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 6ed0b74..7154d58 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.PFunction;
@@ -132,4 +133,10 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
 
     public MetaDataMutationResult dropSchema(List<Mutation> schemaMetaData, 
String schemaName) throws SQLException;
 
+    /**
+     * Removes cache {@link PTableStats} for the table with the given name. If 
no cached stats are present, this does nothing.
+     *
+     * @param tableName The table to remove stats for
+     */
+    void invalidateStats(ImmutableBytesPtr tableName);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 01dfcc4..b09621e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -51,6 +51,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
@@ -180,7 +181,6 @@ import 
org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableProperty;
 import org.apache.phoenix.schema.stats.PTableStats;
-import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PInteger;
@@ -216,8 +216,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Throwables;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -230,8 +228,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private static final Logger logger = 
LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
     private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100;
     private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 
1000;
-    // Max number of cached table stats for view or shared index physical 
tables
-    private static final int MAX_TABLE_STATS_CACHE_ENTRIES = 512;
     protected final Configuration config;
     private final ConnectionInfo connectionInfo;
     // Copy of config.getProps(), but read-only to prevent synchronization 
that we
@@ -239,7 +235,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private final ReadOnlyProps props;
     private final String userName;
     private final 
ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
-    private final Cache<ImmutableBytesPtr, PTableStats> tableStatsCache;
+    private final TableStatsCache tableStatsCache;
 
     // Cache the latest meta data here for future connections
     // writes guarded by "latestMetaDataLock"
@@ -340,13 +336,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         // find the HBase version and use that to determine the 
KeyValueBuilder that should be used
         String hbaseVersion = VersionInfo.getVersion();
         this.kvBuilder = KeyValueBuilder.get(hbaseVersion);
-        long halfStatsUpdateFreq = config.getLong(
-                QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB,
-                QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS) / 2;
-        tableStatsCache = CacheBuilder.newBuilder()
-                .maximumSize(MAX_TABLE_STATS_CACHE_ENTRIES)
-                .expireAfterWrite(halfStatsUpdateFreq, TimeUnit.MILLISECONDS)
-                .build();
         this.returnSequenceValues = 
props.getBoolean(QueryServices.RETURN_SEQUENCE_VALUES_ATTRIB, 
QueryServicesOptions.DEFAULT_RETURN_SEQUENCE_VALUES);
         this.renewLeaseEnabled = config.getBoolean(RENEW_LEASE_ENABLED, 
DEFAULT_RENEW_LEASE_ENABLED);
         this.renewLeasePoolSize = config.getInt(RENEW_LEASE_THREAD_POOL_SIZE, 
DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE);
@@ -358,6 +347,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             list.add(queue);
         }
         connectionQueues = ImmutableList.copyOf(list);
+        // A little bit of a smell to leak `this` here, but should not be a 
problem
+        this.tableStatsCache = new TableStatsCache(this, config);
     }
 
     @Override
@@ -3537,35 +3528,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     @Override
     public PTableStats getTableStats(final byte[] physicalName, final long 
clientTimeStamp) throws SQLException {
         try {
-            return tableStatsCache.get(new ImmutableBytesPtr(physicalName), 
new Callable<PTableStats>() {
-                @Override
-                public PTableStats call() throws Exception {
-                    /*
-                     *  The shared view index case is tricky, because we don't 
have
-                     *  table metadata for it, only an HBase table. We do have 
stats,
-                     *  though, so we'll query them directly here and cache 
them so
-                     *  we don't keep querying for them.
-                     */
-                    HTableInterface statsHTable = 
ConnectionQueryServicesImpl.this
-                            
.getTable(SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,
-                                    
ConnectionQueryServicesImpl.this.getProps()).getName());
-                    try {
-                        return StatisticsUtil.readStatistics(statsHTable, 
physicalName, clientTimeStamp);
-                    } catch (IOException e) {
-                        logger.warn("Unable to read from stats table", e);
-                        // Just cache empty stats. We'll try again after some 
time anyway.
-                        return PTableStats.EMPTY_STATS;
-                    } finally {
-                        try {
-                            statsHTable.close();
-                        } catch (IOException e) {
-                            // Log, but continue. We have our stats anyway now.
-                            logger.warn("Unable to close stats table", e);
-                        }
-                    }
-                }
-
-            });
+            return tableStatsCache.get(new ImmutableBytesPtr(physicalName));
         } catch (ExecutionException e) {
             throw ServerUtil.parseServerException(e);
         }
@@ -3926,4 +3889,19 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
+    /**
+     * Manually adds {@link PTableStats} for a table to the client-side cache. 
Not a
+     * {@link ConnectionQueryServices} method. Exposed for testing purposes.
+     *
+     * @param tableName Table name
+     * @param stats Stats instance
+     */
+    public void addTableStats(ImmutableBytesPtr tableName, PTableStats stats) {
+        this.tableStatsCache.put(Objects.requireNonNull(tableName), stats);
+    }
+
+    @Override
+    public void invalidateStats(ImmutableBytesPtr tableName) {
+        this.tableStatsCache.invalidate(Objects.requireNonNull(tableName));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index b92c358..f373de2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
@@ -50,6 +51,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -112,6 +114,7 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     private volatile boolean initialized;
     private volatile SQLException initializationException;
     private final Map<String, List<HRegionLocation>> tableSplits = 
Maps.newHashMap();
+    private final TableStatsCache tableStatsCache;
     
     public ConnectionlessQueryServicesImpl(QueryServices services, 
ConnectionInfo connInfo, Properties info) {
         super(services);
@@ -138,6 +141,7 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
         config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
         TransactionManager txnManager = new TransactionManager(config);
         this.txSystemClient = new InMemoryTxSystemClient(txnManager);
+        this.tableStatsCache = new TableStatsCache(this, config);
     }
 
     private PMetaData newEmptyMetaData() {
@@ -516,7 +520,11 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
 
     @Override
     public PTableStats getTableStats(byte[] physicalName, long 
clientTimeStamp) {
-        return PTableStats.EMPTY_STATS;
+        PTableStats stats = 
tableStatsCache.getCache().getIfPresent(physicalName);
+        if (null == stats) {
+          return PTableStats.EMPTY_STATS;
+        }
+        return stats;
     }
 
     @Override
@@ -629,4 +637,20 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     public MetaDataMutationResult dropSchema(List<Mutation> schemaMetaData, 
String schemaName) {
         return new MetaDataMutationResult(MutationCode.SCHEMA_ALREADY_EXISTS, 
0, null);
     }
+
+    /**
+     * Manually adds {@link PTableStats} for a table to the client-side cache. 
Not a
+     * {@link ConnectionQueryServices} method. Exposed for testing purposes.
+     *
+     * @param tableName Table name
+     * @param stats Stats instance
+     */
+    public void addTableStats(ImmutableBytesPtr tableName, PTableStats stats) {
+        this.tableStatsCache.put(Objects.requireNonNull(tableName), stats);
+    }
+
+    @Override
+    public void invalidateStats(ImmutableBytesPtr tableName) {
+        this.tableStatsCache.invalidate(Objects.requireNonNull(tableName));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index bc6148b..953c73d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.PFunction;
@@ -341,4 +342,9 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
     public MetaDataMutationResult dropSchema(List<Mutation> schemaMetaData, 
String schemaName) throws SQLException {
         return getDelegate().dropSchema(schemaMetaData, schemaName);
     }
+
+    @Override
+    public void invalidateStats(ImmutableBytesPtr tableName) {
+        getDelegate().invalidateStats(tableName);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index b66b67b..1917893 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -161,6 +161,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String RUN_UPDATE_STATS_ASYNC = 
"phoenix.update.stats.command.async";
     public static final String STATS_SERVER_POOL_SIZE = 
"phoenix.stats.pool.size";
     public static final String COMMIT_STATS_ASYNC = 
"phoenix.stats.commit.async";
+    // Maximum size in bytes taken up by cached table stats in the client
+    public static final String STATS_MAX_CACHE_SIZE = 
"phoenix.stats.cache.maxSize";
 
     public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = 
"phoenix.sequence.saltBuckets";
     public static final String COPROCESSOR_PRIORITY_ATTRIB = 
"phoenix.coprocessor.priority";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index c3cdbde..0e7dce9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -189,6 +189,8 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = true;
     public static final boolean DEFAULT_COMMIT_STATS_ASYNC = true;
     public static final int DEFAULT_STATS_POOL_SIZE = 4;
+    // Maximum size (in bytes) that cached table stats should take upm
+    public static final long DEFAULT_STATS_MAX_CACHE_SIZE = 256 * 1024 * 1024;
 
     public static final boolean DEFAULT_USE_REVERSE_SCAN = true;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/query/TableStatsCache.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/TableStatsCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/TableStatsCache.java
new file mode 100644
index 0000000..287886e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/TableStatsCache.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
+
+/**
+ * "Client-side" cache for storing {@link PTableStats} for Phoenix tables. 
Intended to decouple
+ * Phoenix from a specific version of Guava's cache.
+ */
+public class TableStatsCache {
+    private static final Logger logger = 
LoggerFactory.getLogger(TableStatsCache.class);
+
+    private final ConnectionQueryServices queryServices;
+    private final LoadingCache<ImmutableBytesPtr, PTableStats> cache;
+
+    public TableStatsCache(ConnectionQueryServices queryServices, 
Configuration config) {
+        this.queryServices = Objects.requireNonNull(queryServices);
+        // Number of millis to expire cache values after write
+        final long statsUpdateFrequency = config.getLong(
+                QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB,
+                QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS);
+        // Maximum number of entries (tables) to store in the cache at one time
+        final long maxTableStatsCacheSize = config.getLong(
+                QueryServices.STATS_MAX_CACHE_SIZE,
+                QueryServicesOptions.DEFAULT_STATS_MAX_CACHE_SIZE);
+        cache = CacheBuilder.newBuilder()
+                // Expire entries a given amount of time after they were 
written
+                .expireAfterWrite(statsUpdateFrequency, TimeUnit.MILLISECONDS)
+                // Maximum total weight (size in bytes) of stats entries
+                .maximumWeight(maxTableStatsCacheSize)
+                // Defer actual size to the PTableStats.getEstimatedSize()
+                .weigher(new Weigher<ImmutableBytesPtr, PTableStats>() {
+                    @Override public int weigh(ImmutableBytesPtr key, 
PTableStats stats) {
+                        return stats.getEstimatedSize();
+                    }
+                })
+                // Log removals at TRACE for debugging
+                .removalListener(new PhoenixStatsCacheRemovalListener())
+                // Automatically load the cache when entries are missing
+                .build(new StatsLoader());
+    }
+
+    /**
+     * {@link CacheLoader} implementation for the Phoenix Table Stats cache.
+     */
+    protected class StatsLoader extends CacheLoader<ImmutableBytesPtr, 
PTableStats> {
+        @Override
+        public PTableStats load(ImmutableBytesPtr tableName) throws Exception {
+            @SuppressWarnings("deprecation")
+            HTableInterface statsHTable = 
queryServices.getTable(SchemaUtil.getPhysicalName(
+                    PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,
+                            queryServices.getProps()).getName());
+            final byte[] tableNameBytes = tableName.copyBytesIfNecessary();
+            try {
+                PTableStats stats = StatisticsUtil.readStatistics(statsHTable, 
tableNameBytes,
+                        Long.MAX_VALUE);
+                traceStatsUpdate(tableNameBytes, stats);
+                return stats;
+            } catch (IOException e) {
+                logger.warn("Unable to read from stats table", e);
+                // Just cache empty stats. We'll try again after some time 
anyway.
+                return PTableStats.EMPTY_STATS;
+            } finally {
+                try {
+                    statsHTable.close();
+                } catch (IOException e) {
+                    // Log, but continue. We have our stats anyway now.
+                    logger.warn("Unable to close stats table", e);
+                }
+            }
+        }
+
+        /**
+         * Logs a trace message for newly inserted entries to the stats cache.
+         */
+        void traceStatsUpdate(byte[] tableName, PTableStats stats) {
+            logger.trace("Updating local TableStats cache (id={}) for {}, 
size={}bytes",
+                  new Object[] {Objects.hashCode(TableStatsCache.this), 
Bytes.toString(tableName),
+                  stats.getEstimatedSize()});
+        }
+    }
+
+    /**
+     * Returns the underlying cache. Try to use the provided methods instead 
of accessing the cache
+     * directly.
+     */
+    LoadingCache<ImmutableBytesPtr, PTableStats> getCache() {
+        return cache;
+    }
+
+    /**
+     * Returns the PTableStats for the given <code>tableName</code, using the 
provided
+     * <code>valueLoader</code> if no such mapping exists.
+     *
+     * @see com.google.common.cache.LoadingCache#get(Object)
+     */
+    public PTableStats get(ImmutableBytesPtr tableName) throws 
ExecutionException {
+        return getCache().get(tableName);
+    }
+
+    /**
+     * Cache the given <code>stats</code> to the cache for the given 
<code>tableName</code>.
+     *
+     * @see com.google.common.cache.Cache#put(Object, Object)
+     */
+    public void put(ImmutableBytesPtr tableName, PTableStats stats) {
+        getCache().put(Objects.requireNonNull(tableName), 
Objects.requireNonNull(stats));
+    }
+
+    /**
+     * Removes the mapping for <code>tableName</code> if it exists.
+     *
+     * @see com.google.common.cache.Cache#invalidate(Object)
+     */
+    public void invalidate(ImmutableBytesPtr tableName) {
+        getCache().invalidate(Objects.requireNonNull(tableName));
+    }
+
+    /**
+     * Removes all mappings from the cache.
+     *
+     * @see com.google.common.cache.Cache#invalidateAll()
+     */
+    public void invalidateAll() {
+        getCache().invalidateAll();
+    }
+
+    /**
+     * A {@link RemovalListener} implementation to track evictions from the 
table stats cache.
+     */
+    static class PhoenixStatsCacheRemovalListener implements
+            RemovalListener<ImmutableBytesPtr, PTableStats> {
+        @Override
+        public void onRemoval(RemovalNotification<ImmutableBytesPtr, 
PTableStats> notification) {
+            final RemovalCause cause = notification.getCause();
+            if (wasEvicted(cause)) {
+                ImmutableBytesPtr ptr = notification.getKey();
+                String tableName = new String(ptr.get(), ptr.getOffset(), 
ptr.getLength());
+                logger.trace("Cached stats for {} with size={}bytes was 
evicted due to cause={}",
+                        new Object[] {tableName, 
notification.getValue().getEstimatedSize(),
+                                cause});
+            }
+        }
+
+        boolean wasEvicted(RemovalCause cause) {
+            // This is actually a method on RemovalCause but isn't exposed
+            return RemovalCause.EXPLICIT != cause && RemovalCause.REPLACED != 
cause;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index ddd2de0..3ee012f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.schema.stats.PTableStats;
 
 public class DelegateTable implements PTable {
     @Override
@@ -226,11 +225,6 @@ public class DelegateTable implements PTable {
         return delegate.getIndexType();
     }
 
-    @Override
-    public PTableStats getTableStats() {
-        return delegate.getTableStats();
-    }
-
     private final PTable delegate;
 
     public DelegateTable(PTable delegate) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 3a4010b..dce9a69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1024,10 +1024,6 @@ public class MetaDataClient {
                         public PName getPhysicalName() {
                             return physicalNames.get(index);
                         }
-                        @Override
-                        public PTableStats getTableStats() {
-                            return PTableStats.EMPTY_STATS;
-                        }
                     };
                     rowCount += updateStatisticsInternal(name, 
indexLogicalTable, updateStatisticsStmt.getProps());
                 }
@@ -3582,24 +3578,9 @@ public class MetaDataClient {
         String physicalName = 
table.getPhysicalName().toString().replace(QueryConstants.NAMESPACE_SEPARATOR,
                 QueryConstants.NAME_SEPARATOR);
         if (isView && table.getViewType() != ViewType.MAPPED) {
-            try {
-                return connection.getTable(new PTableKey(null, 
physicalName)).getTableStats();
-            } catch (TableNotFoundException e) {
-                // Possible when the table timestamp == current timestamp - 1.
-                // This would be most likely during the initial index build of 
a view index
-                // where we're doing an upsert select from the tenant specific 
table.
-                // TODO: would we want to always load the physical table in 
updateCache in
-                // this case too, as we might not update the view with all of 
it's indexes?
-                String physicalSchemaName = 
SchemaUtil.getSchemaNameFromFullName(physicalName);
-                String physicalTableName = 
SchemaUtil.getTableNameFromFullName(physicalName);
-                MetaDataMutationResult result = updateCache(null, 
physicalSchemaName, physicalTableName, false);
-                if (result.getTable() == null) {
-                    throw new TableNotFoundException(physicalSchemaName, 
physicalTableName);
-                }
-                return result.getTable().getTableStats();
-            }
-        }
-        return table.getTableStats();
+              return 
connection.getQueryServices().getTableStats(Bytes.toBytes(physicalName), 
getCurrentScn());
+        }
+        return 
connection.getQueryServices().getTableStats(table.getName().getBytes(), 
getCurrentScn());
     }
 
     private void throwIfLastPKOfParentIsFixedLength(PTable parent, String 
viewSchemaName, String viewName, ColumnDef col) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index ca48fcb..344dc2c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.schema.stats.PTableStats;
 
 
@@ -327,7 +328,6 @@ public interface PTable extends PMetaDataEntity {
     PTableKey getKey();
 
     IndexType getIndexType();
-    PTableStats getTableStats();
     int getBaseColumnCount();
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index acb61a3..76bc0d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -28,8 +28,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
@@ -40,8 +38,6 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.generated.PGuidePostsProtos;
-import org.apache.phoenix.coprocessor.generated.PGuidePostsProtos.PGuidePosts;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -51,9 +47,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
-import org.apache.phoenix.schema.stats.GuidePostsInfo;
-import org.apache.phoenix.schema.stats.PTableStats;
-import org.apache.phoenix.schema.stats.PTableStatsImpl;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
@@ -75,7 +68,6 @@ import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.sun.istack.NotNull;
 
 import org.apache.tephra.TxConstants;
 
@@ -131,7 +123,6 @@ public class PTableImpl implements PTable {
     private Short viewIndexId;
     private int estimatedSize;
     private IndexType indexType;
-    private PTableStats tableStats = PTableStats.EMPTY_STATS;
     private int baseColumnCount;
     private boolean rowKeyOrderOptimizable; // TODO: remove when required that 
tables have been upgrade for PHOENIX-2067
     private boolean hasColumnsRequiringUpgrade; // TODO: remove when required 
that tables have been upgrade for PHOENIX-2067
@@ -183,7 +174,7 @@ public class PTableImpl implements PTable {
         }
         this.families = families;
         init(tenantId, this.schemaName, this.tableName, PTableType.INDEX, 
state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
-            PTableStats.EMPTY_STATS, this.schemaName, parentTableName, 
indexes, isImmutableRows, physicalNames, defaultFamilyName,
+            this.schemaName, parentTableName, indexes, isImmutableRows, 
physicalNames, defaultFamilyName,
             null, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, 
indexType, baseColumnCount, rowKeyOrderOptimizable,
             isTransactional, updateCacheFrequency, indexDisableTimestamp, 
isNamespaceMpped, null, false);
     }
@@ -228,7 +219,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), 
table.getBucketNum(), getColumnsToClone(table), parentSchemaName, 
table.getParentTableName(),
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), 
table.getDefaultFamilyName(), viewStatement,
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), 
table.rowKeyOrderOptimizable(), table.isTransactional(), 
table.getUpdateCacheFrequency(), 
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(),
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
 
@@ -238,7 +229,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), 
table.getBucketNum(), columns, table.getParentSchemaName(), 
table.getParentTableName(),
                 table.getIndexes(), table.isImmutableRows(), 
table.getPhysicalNames(), table.getDefaultFamilyName(), 
table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), 
table.rowKeyOrderOptimizable(), table.isTransactional(), 
table.getUpdateCacheFrequency(),
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(),
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
 
@@ -247,7 +238,7 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), 
table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), 
columns, table.getParentSchemaName(), table.getParentTableName(), 
table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
-                table.isMultiTenant(), table.getStoreNulls(), 
table.getViewType(), table.getViewIndexId(), table.getIndexType(), 
table.getTableStats(),
+                table.isMultiTenant(), table.getStoreNulls(), 
table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), 
                 table.isNamespaceMapped(), table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema());
     }
@@ -258,7 +249,7 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), 
columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                table.getIndexType(), table.getTableStats(), 
table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), 
+                table.getIndexType(), table.getBaseColumnCount(), 
table.rowKeyOrderOptimizable(), table.isTransactional(),
                 table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
     
@@ -268,7 +259,7 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), 
table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), 
columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(),
-                isWalDisabled, isMultitenant, storeNulls, table.getViewType(), 
table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
+                isWalDisabled, isMultitenant, storeNulls, table.getViewType(), 
table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), 
                 isNamespaceMapped, table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema());
     }
@@ -280,7 +271,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), 
table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), 
table.rowKeyOrderOptimizable(), table.isTransactional(), 
table.getUpdateCacheFrequency(),
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(),
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
 
@@ -290,18 +281,18 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), 
table.getBucketNum(), getColumnsToClone(table),
                 table.getParentSchemaName(), table.getParentTableName(), 
table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(),
-                table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(), table.getTableStats(),
+                table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
                 table.getBaseColumnCount(), rowKeyOrderOptimizable, 
table.isTransactional(), table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
                 table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
 
-    public static PTableImpl makePTable(PTable table, PTableStats stats) 
throws SQLException {
+    public static PTableImpl makePTable(PTable table) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), 
table.getTableName(), table.getType(), table.getIndexState(), 
table.getTimeStamp(),
                 table.getSequenceNumber(), table.getPKName(), 
table.getBucketNum(), getColumnsToClone(table),
                 table.getParentSchemaName(), table.getParentTableName(), 
table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), 
table.getDefaultFamilyName(), table.getViewStatement(),
-                table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(), stats,
+                table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), 
table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), 
                 table.isNamespaceMapped(), table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema());
     }
@@ -316,7 +307,7 @@ public class PTableImpl implements PTable {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, 
timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, 
defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, 
viewIndexId,
-                indexType, PTableStats.EMPTY_STATS, 
QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, 
isTransactional,
+                indexType, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, 
rowKeyOrderOptimizable, isTransactional,
                 updateCacheFrequency,indexDisableTimestamp, isNamespaceMapped, 
autoPartitionSeqName, isAppendOnlySchema);
     }
 
@@ -326,13 +317,13 @@ public class PTableImpl implements PTable {
             boolean isImmutableRows, List<PName> physicalNames, PName 
defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, 
ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean 
isTransactional, long updateCacheFrequency,
-            @NotNull PTableStats stats, int baseColumnCount, long 
indexDisableTimestamp, boolean isNamespaceMapped, 
+            int baseColumnCount, long indexDisableTimestamp, boolean 
isNamespaceMapped,
             String autoPartitionSeqName, boolean isAppendOnlySchema)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, 
timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, 
isImmutableRows, physicalNames,
                 defaultFamilyName, viewExpression, disableWAL, multiTenant, 
storeNulls, viewType, viewIndexId,
-                indexType, stats, baseColumnCount, rowKeyOrderOptimizable, 
isTransactional, updateCacheFrequency, 
+                indexType, baseColumnCount, rowKeyOrderOptimizable, 
isTransactional, updateCacheFrequency, 
                 indexDisableTimestamp, isNamespaceMapped, 
autoPartitionSeqName, isAppendOnlySchema);
     }
 
@@ -341,10 +332,10 @@ public class PTableImpl implements PTable {
             PName parentSchemaName, PName parentTableName, List<PTable> 
indexes, boolean isImmutableRows,
             List<PName> physicalNames, PName defaultFamilyName, String 
viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, 
IndexType indexType,
-            PTableStats stats, int baseColumnCount, boolean 
rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, 
+            int baseColumnCount, boolean rowKeyOrderOptimizable, boolean 
isTransactional, long updateCacheFrequency,
             long indexDisableTimestamp, boolean isNamespaceMapped, String 
autoPartitionSeqName, boolean isAppendOnlySchema) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, 
sequenceNumber, pkName, bucketNum, columns,
-                stats, schemaName, parentTableName, indexes, isImmutableRows, 
physicalNames, defaultFamilyName,
+                schemaName, parentTableName, indexes, isImmutableRows, 
physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, 
viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
                 isTransactional, updateCacheFrequency, indexDisableTimestamp, 
isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema);
     }
@@ -376,7 +367,7 @@ public class PTableImpl implements PTable {
     }
 
     private void init(PName tenantId, PName schemaName, PName tableName, 
PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
-            PName pkName, Integer bucketNum, List<PColumn> columns, 
PTableStats stats, PName parentSchemaName, PName parentTableName,
+            PName pkName, Integer bucketNum, List<PColumn> columns, PName 
parentSchemaName, PName parentTableName,
             List<PTable> indexes, boolean isImmutableRows, List<PName> 
physicalNames, PName defaultFamilyName, String viewExpression, boolean 
disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short 
viewIndexId,
             IndexType indexType , int baseColumnCount, boolean 
rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, 
long indexDisableTimestamp, 
@@ -411,7 +402,6 @@ public class PTableImpl implements PTable {
         this.viewIndexId = viewIndexId;
         this.indexType = indexType;
         this.isTransactional = isTransactional;
-        this.tableStats = stats;
         this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
         this.updateCacheFrequency = updateCacheFrequency;
         this.isNamespaceMapped = isNamespaceMapped;
@@ -506,7 +496,6 @@ public class PTableImpl implements PTable {
         estimatedSize += rowKeySchema.getEstimatedSize();
         Iterator<Map.Entry<PName,List<PColumn>>> iterator = 
familyMap.entrySet().iterator();
         PColumnFamily[] families = new PColumnFamily[familyMap.size()];
-        estimatedSize += this.tableStats.getEstimatedSize();
         ImmutableMap.Builder<String, PColumnFamily> familyByString = 
ImmutableMap.builder();
         ImmutableSortedMap.Builder<byte[], PColumnFamily> familyByBytes = 
ImmutableSortedMap
                 .orderedBy(Bytes.BYTES_COMPARATOR);
@@ -1062,17 +1051,6 @@ public class PTableImpl implements PTable {
       }
 
       boolean isImmutableRows = table.getIsImmutableRows();
-      SortedMap<byte[], GuidePostsInfo> tableGuidePosts = new TreeMap<byte[], 
GuidePostsInfo>(Bytes.BYTES_COMPARATOR);
-      for (PTableProtos.PTableStats pTableStatsProto : 
table.getGuidePostsList()) {
-          PGuidePosts pGuidePosts = pTableStatsProto.getPGuidePosts();
-          int maxLength = pGuidePosts.getMaxLength();
-          int guidePostsCount = pGuidePosts.getEncodedGuidePostsCount();
-            GuidePostsInfo info = new 
GuidePostsInfo(pGuidePosts.getByteCountsList(),
-                    new 
ImmutableBytesWritable(pGuidePosts.getEncodedGuidePosts().toByteArray()),
-                    pGuidePosts.getRowCountsList(), maxLength, 
guidePostsCount);
-          tableGuidePosts.put(pTableStatsProto.getKey().toByteArray(), info);
-      }
-      PTableStats stats = new PTableStatsImpl(tableGuidePosts, 
table.getStatsTimeStamp());
       PName dataTableName = null;
       if (table.hasDataTableNameBytes()) {
         dataTableName = 
PNameFactory.newName(table.getDataTableNameBytes().toByteArray());
@@ -1130,7 +1108,7 @@ public class PTableImpl implements PTable {
       try {
         PTableImpl result = new PTableImpl();
         result.init(tenantId, schemaName, tableName, tableType, indexState, 
timeStamp, sequenceNumber, pkName,
-            (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, 
schemaName,dataTableName, indexes,
+            (bucketNum == NO_SALTING) ? null : bucketNum, columns, 
schemaName,dataTableName, indexes,
             isImmutableRows, physicalNames, defaultFamilyName, viewStatement, 
disableWAL,
             multiTenant, storeNulls, viewType, viewIndexId, indexType, 
baseColumnCount, rowKeyOrderOptimizable,
             isTransactional, updateCacheFrequency, indexDisableTimestamp, 
isNamespaceMapped, autoParititonSeqName, isAppendOnlySchema);
@@ -1186,21 +1164,6 @@ public class PTableImpl implements PTable {
       }
       builder.setIsImmutableRows(table.isImmutableRows());
 
-      for (Map.Entry<byte[], GuidePostsInfo> entry : 
table.getTableStats().getGuidePosts().entrySet()) {
-         PTableProtos.PTableStats.Builder statsBuilder = 
PTableProtos.PTableStats.newBuilder();
-         statsBuilder.setKey(ByteStringer.wrap(entry.getKey()));
-         
statsBuilder.setGuidePostsCount(entry.getValue().getGuidePostsCount());
-         PGuidePostsProtos.PGuidePosts.Builder guidePstsBuilder = 
PGuidePostsProtos.PGuidePosts.newBuilder();
-            
guidePstsBuilder.setEncodedGuidePosts(ByteStringer.wrap(entry.getValue().getGuidePosts().get()));
-            
guidePstsBuilder.addAllByteCounts(entry.getValue().getByteCounts());
-            guidePstsBuilder.addAllRowCounts(entry.getValue().getRowCounts());
-            guidePstsBuilder.setMaxLength(entry.getValue().getMaxLength());
-            
guidePstsBuilder.setEncodedGuidePostsCount(entry.getValue().getGuidePostsCount());
-            statsBuilder.setPGuidePosts(guidePstsBuilder);
-            builder.addGuidePosts(statsBuilder.build());
-        }
-      builder.setStatsTimeStamp(table.getTableStats().getTimestamp());
-
       if (table.getParentName() != null) {
         
builder.setDataTableNameBytes(ByteStringer.wrap(table.getParentTableName().getBytes()));
       }
@@ -1238,11 +1201,6 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public PTableStats getTableStats() {
-        return tableStats;
-    }
-
-    @Override
     public PName getParentSchemaName() {
         return parentSchemaName;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
index bb5f408..ad99514 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
@@ -32,10 +32,11 @@ import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.ConnectionlessQueryServicesImpl;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsInfoBuilder;
@@ -654,7 +655,7 @@ public class SkipScanBigFilterTest extends 
BaseConnectionlessQueryTest {
         GuidePostsInfo info = gpWriter.build();
         final SortedMap<byte[], GuidePostsInfo> gpMap = 
Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
         gpMap.put(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, info);
-        PTable tableWithStats = PTableImpl.makePTable(table, new PTableStats() 
{
+        PTableStats stats = new PTableStats() {
 
             @Override
             public SortedMap<byte[], GuidePostsInfo> getGuidePosts() {
@@ -670,8 +671,11 @@ public class SkipScanBigFilterTest extends 
BaseConnectionlessQueryTest {
             public long getTimestamp() {
                 return table.getTimeStamp()+1;
             }
-        });
-        conn.unwrap(PhoenixConnection.class).addTable(tableWithStats, 
System.currentTimeMillis());
+        };
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        pConn.addTable(table, System.currentTimeMillis());
+        ((ConnectionlessQueryServicesImpl) pConn.getQueryServices())
+                .addTableStats(table.getName().getBytesPtr(), stats);
 
         String query = "SELECT count(1) cnt,\n" + 
                 "       coalesce(SUM(impressions), 0.0) AS \"impressions\",\n" 
+ 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixStatsCacheRemovalListenerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixStatsCacheRemovalListenerTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixStatsCacheRemovalListenerTest.java
new file mode 100644
index 0000000..b4e01a7
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixStatsCacheRemovalListenerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.phoenix.query.TableStatsCache.PhoenixStatsCacheRemovalListener;
+import org.junit.Test;
+
+import com.google.common.cache.RemovalCause;
+
+/**
+ * Test class around the PhoenixStatsCacheRemovalListener.
+ */
+public class PhoenixStatsCacheRemovalListenerTest {
+
+    @Test
+    public void nonEvictionsAreIgnored() {
+        // We don't care so much about cases where we trigger a removal or 
update of the stats
+        // for a table in the cache, but we would want to know about updates 
happening automatically
+        PhoenixStatsCacheRemovalListener listener = new 
PhoenixStatsCacheRemovalListener();
+        // User-driven removals or updates
+        assertFalse(listener.wasEvicted(RemovalCause.EXPLICIT));
+        assertFalse(listener.wasEvicted(RemovalCause.REPLACED));
+        // Automatic removals by the cache itself (per configuration)
+        assertTrue(listener.wasEvicted(RemovalCause.COLLECTED));
+        assertTrue(listener.wasEvicted(RemovalCause.EXPIRED));
+        assertTrue(listener.wasEvicted(RemovalCause.SIZE));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 872555c..0a11977 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -554,7 +554,9 @@ public class TestUtil {
         }
         pstmt.execute();
         TableRef tableRef = pstmt.getQueryPlan().getTableRef();
-        PTableStats tableStats = tableRef.getTable().getTableStats();
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        long scn = null == pconn.getSCN() ? Long.MAX_VALUE : pconn.getSCN();
+        PTableStats tableStats = 
pconn.getQueryServices().getTableStats(tableRef.getTable().getName().getBytes(),
 scn);
         return tableStats.getGuidePosts().values();
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/81776445/phoenix-protocol/src/main/PTable.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/PTable.proto 
b/phoenix-protocol/src/main/PTable.proto
index 2696430..7cb1495 100644
--- a/phoenix-protocol/src/main/PTable.proto
+++ b/phoenix-protocol/src/main/PTable.proto
@@ -72,7 +72,9 @@ message PTable {
   repeated PColumn columns = 9;
   repeated PTable indexes = 10;
   required bool isImmutableRows = 11;
-  repeated PTableStats guidePosts = 12;
+  // Do NOT reuse the tag '12'. Stats are no longer passed
+  // along with the PTable.
+  //repeated PTableStats guidePosts = 12;
   optional bytes dataTableNameBytes = 13;
   optional bytes defaultFamilyName = 14;
   required bool disableWAL = 15;

Reply via email to