yifan-c commented on code in PR #346:
URL: https://github.com/apache/cassandra-sidecar/pull/346#discussion_r3231844024


##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java:
##########
@@ -117,6 +125,12 @@ protected void handleInternal(RoutingContext context,
             }
 
             
context.response().setStatusCode(HttpResponseStatus.OK.code()).end();
+            // Fire-and-forget on a worker thread — notifying the restore 
system should not
+            // block the event loop or delay the HTTP response.
+            executorPools.service().executeBlocking(() -> {
+                notifyPhaseSignalMaybe(job);
+                return null;
+            });

Review Comment:
   nit: it can be simplified to one line.
   
   ```suggestion
               executorPools.service().runBlocking(() -> 
notifyPhaseSignalMaybe(job));
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java:
##########
@@ -147,4 +161,38 @@ protected UpdateRestoreJobRequestPayload 
extractParamsOrThrow(RoutingContext con
             throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid 
request payload", decodeException);
         }
     }
+
+    private void notifyPhaseSignalMaybe(RestoreJob job)
+    {
+        if (job.status != RestoreJobStatus.IMPORT_READY && job.status != 
RestoreJobStatus.STAGE_READY)
+        {
+            return;
+        }
+
+        try
+        {
+            // The job returned from update() is sparse (only contains updated 
fields).
+            // Re-read the full job from DB to get all required fields 
(keyspace, consistency level, etc.).
+            RestoreJob fullJob = restoreJobDatabaseAccessor.find(job.jobId);

Review Comment:
   We should be able to skip this lookup. 
   In the scope of the handler, it actually has the job object retrieved from 
database. See line#96, i.e. `        RoutingContextUtils.getAsFuture(context, 
SC_RESTORE_JOB)`. Note, the job object is before the update. 



##########
server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerGroup.java:
##########
@@ -105,7 +105,7 @@ void removeJobInternal(RestoreJob restoreJob)
      *
      * @param restoreJob restore job to update
      */
-    void updateRestoreJob(RestoreJob restoreJob)
+    public void updateRestoreJob(RestoreJob restoreJob)

Review Comment:
   can potentially revert back to package-private, if `updateRestoreJob` does 
not need to be visible to the handler. 



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java:
##########
@@ -147,4 +161,38 @@ protected UpdateRestoreJobRequestPayload 
extractParamsOrThrow(RoutingContext con
             throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid 
request payload", decodeException);
         }
     }
+
+    private void notifyPhaseSignalMaybe(RestoreJob job)
+    {
+        if (job.status != RestoreJobStatus.IMPORT_READY && job.status != 
RestoreJobStatus.STAGE_READY)
+        {
+            return;
+        }
+
+        try
+        {
+            // The job returned from update() is sparse (only contains updated 
fields).
+            // Re-read the full job from DB to get all required fields 
(keyspace, consistency level, etc.).
+            RestoreJob fullJob = restoreJobDatabaseAccessor.find(job.jobId);
+            if (fullJob == null)
+            {
+                logger.warn("Cannot find restore job for phase signal 
notification. jobId={}", job.jobId);
+                return;
+            }
+
+            if (fullJob.status == RestoreJobStatus.IMPORT_READY)
+            {
+                restoreJobManagerGroup.updateRestoreJob(fullJob);
+            }
+            else if (fullJob.status == RestoreJobStatus.STAGE_READY)
+            {
+                restoreJobDiscoverer.processJobNow(fullJob);

Review Comment:
   I think for both `IMPORT_READY` and `STAGE_READY`, we can call 
`processJobNow`



##########
server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererPhaseSignalIntTest.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.restore.jobdiscoverer;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import 
org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload;
+import 
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.db.RestoreRange;
+import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor;
+import org.apache.cassandra.sidecar.restore.RestoreJobTestUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.sidecar.testing.TestTokenSupplier;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import org.apache.cassandra.testing.IClusterExtension;
+
+import static 
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.assertRestoreRange;
+import static 
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.createJob;
+import static 
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.disableRestoreProcessor;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests verifying that phase signals (STAGE_READY, IMPORT_READY) 
sent via
+ * UpdateRestoreJobHandler immediately trigger processing without waiting for 
the discovery loop.
+ */
+class RestoreJobDiscovererPhaseSignalIntTest extends IntegrationTestBase

Review Comment:
   Please create the test in `integration-tests`, using 
`SharedClusterIntegrationTestBase`; as name suggests, the same in-jvm cassandra 
cluster is shared among test cases in the file.
   
   `IntegrationTestBase` is deprecating, it consumes a lot of resources, as 
each test starts up and tears down in-jvm cluster. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to