PakhomovAlexander commented on code in PR #1929:
URL: https://github.com/apache/ignite-3/pull/1929#discussion_r1172404883
##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java:
##########
@@ -56,99 +66,146 @@
public class LocalFileConfigurationStorage implements ConfigurationStorage {
private static final IgniteLogger LOG =
Loggers.forClass(LocalFileConfigurationStorage.class);
- /**
- * Path to config file.
- */
+ /** Path to config file. */
private final Path configPath;
- /**
- * Path to temporary configuration storage.
- */
+ /** Path to temporary configuration storage. */
private final Path tempConfigPath;
+ /** R/W lock to guard the latest configuration and config file. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- /**
- * Latest state of last applied configuration.
- */
+ /** Latest state of last applied configuration. */
private final Map<String, Serializable> latest = new ConcurrentHashMap<>();
- /**
- * Configuration changes listener.
- * */
+ /** Configuration tree generator. */
+ private final ConfigurationTreeGenerator generator;
+
+ /** Configuration changes listener. */
private final AtomicReference<ConfigurationStorageListener> lsnrRef = new
AtomicReference<>();
- private final ExecutorService threadPool = Executors.newFixedThreadPool(2,
new NamedThreadFactory("loc-cfg-file", LOG));
+ /** Thread pool for configuration updates notifications. */
+ private final ExecutorService notificationsThreadPool =
Executors.newFixedThreadPool(
+ 2, new NamedThreadFactory("cfg-file", LOG)
+ );
+
+ /** Thread pool for configuration updates. */
+ private final ExecutorService workerThreadPool =
Executors.newFixedThreadPool(
+ 2, new NamedThreadFactory("cfg-file-worker", LOG)
+ );
+ /** Tracks all running futures. */
private final InFlightFutures futureTracker = new InFlightFutures();
+ /** Last revision for configuration. */
private long lastRevision = 0L;
/**
* Constructor.
*
* @param configPath Path to node bootstrap configuration file.
+ * @param generator Configuration tree generator.
*/
- public LocalFileConfigurationStorage(Path configPath) {
+ public LocalFileConfigurationStorage(Path configPath,
ConfigurationTreeGenerator generator) {
this.configPath = configPath;
- tempConfigPath = configPath.resolveSibling(configPath.getFileName() +
".tmp");
+ this.generator = generator;
+ this.tempConfigPath =
configPath.resolveSibling(configPath.getFileName() + ".tmp");
+
checkAndRestoreConfigFile();
}
@Override
public CompletableFuture<Data> readDataOnRecovery() {
- return CompletableFuture.completedFuture(new
Data(Collections.emptyMap(), 0));
+ return async(() -> {
+ lock.writeLock().lock();
+ try {
+ SuperRoot superRoot = generator.createSuperRoot();
+ SuperRoot copiedSuperRoot = superRoot.copy();
+
+ Config hocon = readHoconFromFile();
+
HoconConverter.hoconSource(hocon.root()).descend(copiedSuperRoot);
+
+ Map<String, Serializable> flattenedUpdatesMap =
createFlattenedUpdatesMap(superRoot, copiedSuperRoot);
+ flattenedUpdatesMap.forEach((key, value) -> {
+ if (value != null) { // filter defaults
+ latest.put(key, value);
+ }
+ });
+
+ return new Data(flattenedUpdatesMap, lastRevision);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ });
+ }
+
+ private Config readHoconFromFile() {
+ checkAndRestoreConfigFile();
+
+ return ConfigFactory.parseFile(configPath.toFile(),
ConfigParseOptions.defaults().setAllowMissing(false));
}
@Override
public CompletableFuture<Map<String, ? extends Serializable>>
readAllLatest(String prefix) {
- lock.readLock().lock();
- try {
- checkAndRestoreConfigFile();
- Map<String, Serializable> map = latest.entrySet()
- .stream()
- .filter(entry -> entry.getKey().startsWith(prefix))
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
- return CompletableFuture.completedFuture(map);
- } finally {
- lock.readLock().unlock();
- }
+ return async(() -> {
+ lock.readLock().lock();
+ try {
+ return latest.entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().startsWith(prefix))
+ .collect(toMap(Entry::getKey, Entry::getValue));
+ } finally {
+ lock.readLock().unlock();
+ }
+ });
}
@Override
public CompletableFuture<Serializable> readLatest(String key) {
- lock.readLock().lock();
- try {
- checkAndRestoreConfigFile();
- return CompletableFuture.completedFuture(latest.get(key));
- } finally {
- lock.readLock().unlock();
- }
+ return async(() -> {
+ lock.readLock().lock();
+ try {
+ return latest.get(key);
+ } finally {
+ lock.readLock().unlock();
+ }
+ });
}
@Override
public CompletableFuture<Boolean> write(Map<String, ? extends
Serializable> newValues, long ver) {
- lock.writeLock().lock();
- try {
- if (ver != lastRevision) {
- return CompletableFuture.completedFuture(false);
+ return async(() -> {
+ lock.writeLock().lock();
+ try {
+ if (ver != lastRevision) {
+ return false;
+ }
+
+ mergeAndSave(newValues);
+
+ sendNotificationAsync(new Data(newValues, lastRevision));
+
+ return true;
+ } finally {
+ lock.writeLock().unlock();
}
- checkAndRestoreConfigFile();
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19152
- //saveValues(newValues);
- latest.putAll(newValues);
- lastRevision++;
- runAsync(() -> lsnrRef.get().onEntriesChanged(new Data(newValues,
lastRevision)));
- return CompletableFuture.completedFuture(true);
- } finally {
- lock.writeLock().unlock();
- }
+ });
}
- private void runAsync(Runnable runnable) {
- CompletableFuture<Void> future = CompletableFuture.runAsync(runnable,
threadPool);
+ private void mergeAndSave(Map<String, ? extends Serializable> newValues) {
+ updateLatestState(newValues);
+ saveConfigFile();
+ lastRevision++;
Review Comment:
I don't get you. Here is the part of the `write` method that verifies the
version is the same. How it can be "new" revision?
```java
if (ver != lastRevision) {
return CompletableFuture.completedFuture(false);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]