sashapolo commented on code in PR #1626:
URL: https://github.com/apache/ignite-3/pull/1626#discussion_r1097377262
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -433,21 +434,46 @@ public void stop() throws Exception {
* @return The latest schema version.
*/
private int latestSchemaVersion(UUID tblId) {
- try (Cursor<Entry> cur =
metastorageMgr.prefix(schemaHistPrefix(tblId))) {
- int lastVer = INITIAL_SCHEMA_VERSION;
+ var latestVersionFuture = new CompletableFuture<Integer>();
- for (Entry ent : cur) {
- String key = new String(ent.key(), StandardCharsets.UTF_8);
+ metastorageMgr.prefix(schemaHistPrefix(tblId)).subscribe(new
Subscriber<>() {
+ private int lastVer = INITIAL_SCHEMA_VERSION;
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ // Request unlimited demand.
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(Entry item) {
+ String key = new String(item.key(), StandardCharsets.UTF_8);
int descVer = extractVerFromSchemaKey(key);
if (descVer > lastVer) {
lastVer = descVer;
}
}
- return lastVer;
- } catch (NodeStoppingException e) {
- throw new IgniteException(e.traceId(), e.code(), e.getMessage(),
e);
+ @Override
+ public void onError(Throwable throwable) {
+ latestVersionFuture.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ latestVersionFuture.complete(lastVer);
+ }
+ });
+
+ try {
+ return latestVersionFuture.get(10, TimeUnit.SECONDS);
Review Comment:
I can't do it in this PR, that's too many changes. And this code was
synchronous before
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]