sashapolo commented on code in PR #1626:
URL: https://github.com/apache/ignite-3/pull/1626#discussion_r1098329991
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -432,22 +433,48 @@ public void stop() throws Exception {
* @param tblId Table id.
* @return The latest schema version.
*/
+ // TODO: Make this method async, see
https://issues.apache.org/jira/browse/IGNITE-18732
private int latestSchemaVersion(UUID tblId) {
- try (Cursor<Entry> cur =
metastorageMgr.prefix(schemaHistPrefix(tblId))) {
- int lastVer = INITIAL_SCHEMA_VERSION;
+ var latestVersionFuture = new CompletableFuture<Integer>();
- for (Entry ent : cur) {
- String key = new String(ent.key(), StandardCharsets.UTF_8);
+ metastorageMgr.prefix(schemaHistPrefix(tblId)).subscribe(new
Subscriber<>() {
+ private int lastVer = INITIAL_SCHEMA_VERSION;
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ // Request unlimited demand.
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(Entry item) {
+ String key = new String(item.key(), StandardCharsets.UTF_8);
int descVer = extractVerFromSchemaKey(key);
if (descVer > lastVer) {
lastVer = descVer;
}
}
- return lastVer;
- } catch (NodeStoppingException e) {
- throw new IgniteException(e.traceId(), e.code(), e.getMessage(),
e);
+ @Override
+ public void onError(Throwable throwable) {
+ latestVersionFuture.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ latestVersionFuture.complete(lastVer);
+ }
+ });
+
+ try {
+ return latestVersionFuture.get(10, TimeUnit.SECONDS);
Review Comment:
TODO is located on line 436
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]