keith-turner commented on a change in pull request #2324: URL: https://github.com/apache/accumulo/pull/2324#discussion_r747020389
########## File path: server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/CaffeineCache.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.store.impl; + +import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCache; +import org.apache.accumulo.server.conf.store.PropCacheId; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.Ticker; + +public class CaffeineCache implements PropCache { + + public static final TimeUnit BASE_TIME_UNITS = TimeUnit.MINUTES; + public static final int REFRESH_MIN = 15; + public static final int EXPIRE_MIN = 60; + private static final Logger log = LoggerFactory.getLogger(CaffeineCache.class); + private static final Executor executor = + ThreadPools.createThreadPool(1, 20, 60, TimeUnit.SECONDS, "cache-refresh"); + + private final PropStoreMetrics metrics; + + private final LoadingCache<PropCacheId,VersionedProperties> cache; + + private CaffeineCache(final CacheLoader<PropCacheId,VersionedProperties> cacheLoader, + final PropStoreMetrics metrics, final Ticker ticker) { + this.metrics = metrics; + + if (ticker != null) { // build test instance with artificial clock. + cache = Caffeine.newBuilder().refreshAfterWrite(REFRESH_MIN, BASE_TIME_UNITS) + .expireAfterAccess(EXPIRE_MIN, BASE_TIME_UNITS).evictionListener(this::evictionNotifier) + .ticker(ticker).executor(executor).build(cacheLoader); + } else { + cache = Caffeine.newBuilder().refreshAfterWrite(REFRESH_MIN, BASE_TIME_UNITS) + .expireAfterAccess(EXPIRE_MIN, BASE_TIME_UNITS).evictionListener(this::evictionNotifier) + .executor(executor).build(cacheLoader); + } + } + + public PropStoreMetrics getMetrics() { + return metrics; + } + + void evictionNotifier(PropCacheId cacheId, VersionedProperties value, RemovalCause cause) { + log.debug("Evicted: ID: {} was evicted from cache. Reason: {}", cacheId, cause); + metrics.incrEviction(); + } + + @Override + public @Nullable VersionedProperties get(PropCacheId propCacheId) { + log.trace("Called get() for {}", propCacheId); + try { + return cache.get(propCacheId); + } catch (Exception ex) { + log.info("Failed to get properties " + propCacheId, ex); + metrics.incrZkError(); + return null; Review comment: Why not propagate this exception? Could do `throw new RuntimeException(e)` ########## File path: server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoader.java ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.store.impl; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCacheId; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.CacheLoader; + +public class ZooPropLoader implements CacheLoader<PropCacheId,VersionedProperties> { + + private static final Logger log = LoggerFactory.getLogger(ZooPropLoader.class); + + private final ZooReaderWriter zrw; + private final VersionedPropCodec propCodec; + // used to set watcher, does not react to events. + private final PropStoreWatcher propStoreWatcher; + private final PropStoreMetrics metrics; + + public ZooPropLoader(final ZooReaderWriter zrw, final VersionedPropCodec propCodec, + final PropStoreWatcher propStoreWatcher, final PropStoreMetrics metrics) { + this.zrw = zrw; + this.propCodec = propCodec; + this.propStoreWatcher = propStoreWatcher; + this.metrics = metrics; + } + + @Override + public @Nullable VersionedProperties load(PropCacheId propCacheId) { + try { + log.trace("load called for {}", propCacheId); + metrics.updateLoadCounter(); + Stat stat = new Stat(); + var vProps = propCodec.fromBytes(zrw.getData(propCacheId.getPath(), propStoreWatcher, stat)); + if (stat.getVersion() != vProps.getDataVersion()) { Review comment: > The only place where the ZooKeeper stat and the version need to be both known is on the ZooKeeper writes / reads - and I tried to isolate that to the ZooPropLoader. ZooPropStore seems to also use the stat. > Code that uses versioned props does not need to worry about the version. The version only matters on store / reads and then the ZooKeeper specifics are isolated (I hope) to ZooPropLoader - so I think I was trying to achieve the separation that you are asking for - or I'm missing what you are saying? The separation is really nice, not advocating any change to that. Just seems nicer to me if within the ZooPropLoader and ZooPropStore code that the Stat is the authoritative source of the persisted version not the version stored in the value. So for write to ZK ```java zk.put(serializeEverythingButVersion(verProps), verProps.getVersion()) ``` and read from ZK do the following. ```java Stat stat = new stat; byte data = zk.read(propPath, stat) var verProp = newVersionedProps(data, stat) ``` If within these two classes ZK stat is the authority then do not need to do the following * check the version in the value against the stat in ZK after every read * make any assumptions about what the next version in ZK will be ########## File path: server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.store.impl; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCache; +import org.apache.accumulo.server.conf.store.PropCacheId; +import org.apache.accumulo.server.conf.store.PropChangeListener; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZooPropStore implements PropStore, PropChangeListener { + + private final static Logger log = LoggerFactory.getLogger(ZooPropStore.class); + + private final VersionedPropCodec propCodec; + + private final ReadyMonitor zkReadyMon; + + private final ServerContext context; + private final ZooReaderWriter zrw; + private final PropStoreWatcher propStoreWatcher; + private final PropCache cache; + private final PropStoreMetrics cacheMetrics = new PropStoreMetrics(); + + private Map<String,String> fixedProps = null; + + /** + * Create instance using ZooPropStore.Builder + * + * @param context + * the server context + * @param readyMonitor + * coordination utility for ZooKeeper connection status. + * @param propStoreWatcher + * an extended ZooKeeper watcher + */ + private ZooPropStore(final ServerContext context, final ReadyMonitor readyMonitor, + final PropStoreWatcher propStoreWatcher) { + + this.context = context; + this.zrw = context.getZooReaderWriter(); + this.zkReadyMon = readyMonitor; + this.propStoreWatcher = propStoreWatcher; + + this.propCodec = context.getVersionedPropertiesCodec(); + + MetricsUtil.initializeProducers(cacheMetrics); + + ZooPropLoader propLoader = new ZooPropLoader(zrw, context.getVersionedPropertiesCodec(), + propStoreWatcher, cacheMetrics); + + cache = new CaffeineCache.Builder(propLoader, cacheMetrics).build(); + + try { + var path = ZooUtil.getRoot(context.getInstanceID()); + if (zrw.exists(path, propStoreWatcher)) { + log.debug("have a ZooKeeper connection"); + zkReadyMon.setReady(); + } else { + throw new IllegalStateException( + "Instance may not have been initialized, root node" + path + " does not exist"); + } + } catch (InterruptedException | KeeperException ex) { + throw new IllegalStateException("Failed to read root node from ZooKeeper", ex); + } + } + + /** + * Create initial system props for the instance. If the node already exists, no action is + * performed. + * + * @param context + * the server context. + * @param initProps + * map of k, v pairs of initial properties. + * @return true if props create, false if the node existed and initialization was skipped. + */ + public synchronized static boolean initSysProps(final ServerContext context, + final Map<String,String> initProps) { + PropCacheId sysPropsId = PropCacheId.forSystem(context.getInstanceID()); + return createInitialProps(context, sysPropsId, initProps); + } + + /** + * Create initial properties if they do not exist. If the node exists, initialization will be + * skipped. + * + * @param context + * the system context + * @param propCacheId + * a prop id + * @param props + * initial properties + * @return true if props create, false if the node existed and initialization was skipped. + */ + public static boolean createInitialProps(final ServerContext context, + final PropCacheId propCacheId, Map<String,String> props) { + + try { + ZooReaderWriter zrw = context.getZooReaderWriter(); + if (zrw.exists(propCacheId.getPath())) { + return false; + } + VersionedProperties vProps = new VersionedProperties(props); + return zrw.putPersistentData(propCacheId.getPath(), + context.getVersionedPropertiesCodec().toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Interrupted creating node " + propCacheId, ex); + } catch (Exception ex) { + throw new IllegalStateException("Failed to create node " + propCacheId, ex); + } + } + + public PropStoreMetrics getMetrics() { + return cacheMetrics; + } + + // TODO - evaluate returning the props instead of boolean. + @Override + public boolean create(PropCacheId propCacheId, Map<String,String> props) + throws PropStoreException { + + VersionedProperties vProps = new VersionedProperties(props); + + try { + + var path = propCacheId.getPath(); + if (!zrw.putPersistentData(path, propCodec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL)) { + return false; + } + + Stat stat = zrw.getStatus(path, propStoreWatcher); Review comment: There may be a race condition here. Something could run between the putPersistentData call and getStatus call that changes the stat to make it differ. ########## File path: server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.store.impl; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCache; +import org.apache.accumulo.server.conf.store.PropCacheId; +import org.apache.accumulo.server.conf.store.PropChangeListener; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZooPropStore implements PropStore, PropChangeListener { + + private final static Logger log = LoggerFactory.getLogger(ZooPropStore.class); + + private final VersionedPropCodec propCodec; + + private final ReadyMonitor zkReadyMon; + + private final ServerContext context; + private final ZooReaderWriter zrw; + private final PropStoreWatcher propStoreWatcher; + private final PropCache cache; + private final PropStoreMetrics cacheMetrics = new PropStoreMetrics(); + + private Map<String,String> fixedProps = null; + + /** + * Create instance using ZooPropStore.Builder + * + * @param context + * the server context + * @param readyMonitor + * coordination utility for ZooKeeper connection status. + * @param propStoreWatcher + * an extended ZooKeeper watcher + */ + private ZooPropStore(final ServerContext context, final ReadyMonitor readyMonitor, + final PropStoreWatcher propStoreWatcher) { + + this.context = context; + this.zrw = context.getZooReaderWriter(); + this.zkReadyMon = readyMonitor; + this.propStoreWatcher = propStoreWatcher; + + this.propCodec = context.getVersionedPropertiesCodec(); + + MetricsUtil.initializeProducers(cacheMetrics); + + ZooPropLoader propLoader = new ZooPropLoader(zrw, context.getVersionedPropertiesCodec(), + propStoreWatcher, cacheMetrics); + + cache = new CaffeineCache.Builder(propLoader, cacheMetrics).build(); + + try { + var path = ZooUtil.getRoot(context.getInstanceID()); + if (zrw.exists(path, propStoreWatcher)) { + log.debug("have a ZooKeeper connection"); + zkReadyMon.setReady(); + } else { + throw new IllegalStateException( + "Instance may not have been initialized, root node" + path + " does not exist"); + } + } catch (InterruptedException | KeeperException ex) { + throw new IllegalStateException("Failed to read root node from ZooKeeper", ex); + } + } + + /** + * Create initial system props for the instance. If the node already exists, no action is + * performed. + * + * @param context + * the server context. + * @param initProps + * map of k, v pairs of initial properties. + * @return true if props create, false if the node existed and initialization was skipped. + */ + public synchronized static boolean initSysProps(final ServerContext context, + final Map<String,String> initProps) { + PropCacheId sysPropsId = PropCacheId.forSystem(context.getInstanceID()); + return createInitialProps(context, sysPropsId, initProps); + } + + /** + * Create initial properties if they do not exist. If the node exists, initialization will be + * skipped. + * + * @param context + * the system context + * @param propCacheId + * a prop id + * @param props + * initial properties + * @return true if props create, false if the node existed and initialization was skipped. + */ + public static boolean createInitialProps(final ServerContext context, + final PropCacheId propCacheId, Map<String,String> props) { + + try { + ZooReaderWriter zrw = context.getZooReaderWriter(); + if (zrw.exists(propCacheId.getPath())) { + return false; + } + VersionedProperties vProps = new VersionedProperties(props); + return zrw.putPersistentData(propCacheId.getPath(), + context.getVersionedPropertiesCodec().toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Interrupted creating node " + propCacheId, ex); + } catch (Exception ex) { + throw new IllegalStateException("Failed to create node " + propCacheId, ex); + } + } + + public PropStoreMetrics getMetrics() { + return cacheMetrics; + } + + // TODO - evaluate returning the props instead of boolean. + @Override + public boolean create(PropCacheId propCacheId, Map<String,String> props) + throws PropStoreException { + + VersionedProperties vProps = new VersionedProperties(props); + + try { + + var path = propCacheId.getPath(); + if (!zrw.putPersistentData(path, propCodec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL)) { + return false; + } + + Stat stat = zrw.getStatus(path, propStoreWatcher); + + if (stat.getVersion() != vProps.getNextVersion()) { + throw new PropStoreException("Invalid data version on create, have: " + + vProps.getNextVersion() + " received " + stat.getVersion(), + new IllegalStateException()); + } + + return true; + + } catch (IOException | KeeperException | InterruptedException ex) { + throw new PropStoreException("Failed to serialize properties for " + propCacheId, ex); + } + } + + @Override + public VersionedProperties get(final PropCacheId propCacheId) throws PropStoreException { + try { + + checkZkConnection(); // if ZK not connected, block, do not just return a cached value. Review comment: What happens if something calls and ZK is ok and the method returns, but after returning ZK becomes not ok before calling the following two methods? ########## File path: server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoader.java ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.store.impl; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCacheId; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.CacheLoader; + +public class ZooPropLoader implements CacheLoader<PropCacheId,VersionedProperties> { + + private static final Logger log = LoggerFactory.getLogger(ZooPropLoader.class); + + private final ZooReaderWriter zrw; + private final VersionedPropCodec propCodec; + // used to set watcher, does not react to events. + private final PropStoreWatcher propStoreWatcher; + private final PropStoreMetrics metrics; + + public ZooPropLoader(final ZooReaderWriter zrw, final VersionedPropCodec propCodec, + final PropStoreWatcher propStoreWatcher, final PropStoreMetrics metrics) { + this.zrw = zrw; + this.propCodec = propCodec; + this.propStoreWatcher = propStoreWatcher; + this.metrics = metrics; + } + + @Override + public @Nullable VersionedProperties load(PropCacheId propCacheId) { + try { + log.trace("load called for {}", propCacheId); + Stat stat = new Stat(); + long startNanos = System.nanoTime(); + + var vProps = propCodec.fromBytes(zrw.getData(propCacheId.getPath(), propStoreWatcher, stat)); + + metrics.addLoadTime( + TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS)); + + if (stat.getVersion() != vProps.getDataVersion()) { + log.warn("data versions between decoded value {} and zk value {} do not match", + vProps.getDataVersion(), stat.getVersion()); + return null; + } + return vProps; + } catch (Exception ex) { + metrics.incrZkError(); + log.warn("Failed to load: {} from ZooKeeper, returning null", propCacheId, ex); + propStoreWatcher.signalZkChangeEvent(propCacheId); + return null; + } + } + + @Override + public CompletableFuture<? extends VersionedProperties> asyncLoad(PropCacheId propCacheId, + Executor executor) { + log.trace("asyncLoad called for key: {}", propCacheId); + return CacheLoader.super.asyncLoad(propCacheId, executor); + } + + @Override + public CompletableFuture<? extends VersionedProperties> asyncReload(PropCacheId propCacheId, + VersionedProperties oldValue, Executor executor) throws Exception { + log.trace("asyncReload called for key: {}", propCacheId); + metrics.incrRefresh(); + + return CompletableFuture.supplyAsync(() -> loadIfDifferentVersion(propCacheId, oldValue)); Review comment: `supplyAsync` may use a jvm wide shared executor service. Seems like this can cause unrelated code in the JVM to interfere with each other, but I am not sure about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
