yifan-c commented on code in PR #198: URL: https://github.com/apache/cassandra-sidecar/pull/198#discussion_r1962766807
########## CHANGES.txt: ########## @@ -8,7 +9,7 @@ * Sidecar schema initialization can be executed on multiple thread (CASSSIDECAR-200) * Make sidecar operations resilient to down Cassandra nodes (CASSSIDECAR-201) * Fix Cassandra instance not found error (CASSSIDECAR-192) - * Implemented Schema Reporter for Integration with DataHub (CASSSIDECAR-191) + * Implement Schema Reporter for Integration with DataHub (CASSSIDECAR-191) Review Comment: Do not change the existing entries in the `CHANGES.txt` ########## server/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java: ########## @@ -97,13 +97,29 @@ public CassandraAdapterDelegate delegate(@NotNull String host) throws NoSuchCass } /** - * Iterate through the local instances and call the function on the first available instance, i.e. no CassandraUnavailableException - * or OperationUnavailableException is thrown for the operations + * Iterate through the local instances and run the {@link Consumer} on the first available one, + * so no {@link CassandraUnavailableException} or {@link OperationUnavailableException} is thrown for the operations + * + * @param consumer a {@link Consumer} that processes {@link InstanceMetadata} and returns no result + * @throws CassandraUnavailableException if all local instances were exhausted + */ + public void runOnFirstAvailableInstance(Consumer<InstanceMetadata> consumer) throws CassandraUnavailableException + { + callOnFirstAvailableInstance(metadata -> + { + consumer.accept(metadata); + return null; + }); + } Review Comment: Can you remove this method? It is unnecessary. It does not retrieve anything from instance metadata fetcher, who is supposed to "fetch something". I have suggested a different implementation in the new handler w/o using this method. ########## server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java: ########## @@ -63,6 +63,9 @@ public class BasicPermissions public static final Permission READ_OPERATIONAL_JOB = new DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE); public static final Permission DECOMMISSION_NODE = new DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE); + // Permissions related to Schema Reporting + public static final Permission REPORT_SCHEMA = new DomainAwarePermission("SCHEMA:REPORT", CLUSTER_SCOPE); Review Comment: I do not think an extra permission is required. The permission needed in order to publish to DataHub is `SCHEMA:READ`, which already exists. cc: @sarankk ########## server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java: ########## @@ -902,7 +912,8 @@ public IdentifiersProvider identifiersProvider(@NotNull InstanceMetadataFetcher @NotNull protected String initialize() { - return fetcher.callOnFirstAvailableInstance(i -> i.delegate().storageOperations().clusterName()); + return fetcher.callOnFirstAvailableInstance(instance -> + instance.delegate().storageOperations().clusterName()); Review Comment: Please revert the unrelated change. ########## server/src/test/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcherTest.java: ########## @@ -21,7 +21,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.List; - +import com.google.common.collect.ImmutableList; Review Comment: Revert the changes in this file once you remove `runOnFirstAvailableInstance` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Collections; +import java.util.Set; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.datahub.SchemaReporter; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * An implementation of {@link AbstractHandler} used to trigger an immediate, + * synchronous conversion and report of the current schema + */ +@Singleton +public class ReportSchemaHandler extends AbstractHandler<Void> implements AccessProtected +{ + @NotNull + private final SchemaReporter schemaReporter; + + /** + * Constructs a new instance of {@link ReportSchemaHandler} using the provided instances + * of {@link InstanceMetadataFetcher}, {@link ExecutorPools}, and {@link SchemaReporter} + * + * @param metadata the metadata fetcher + * @param executor executor pools for blocking executions + * @param reporter executor pools for blocking executions Review Comment: copy-paste error ########## server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Collections; +import java.util.Set; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.datahub.SchemaReporter; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * An implementation of {@link AbstractHandler} used to trigger an immediate, + * synchronous conversion and report of the current schema + */ +@Singleton +public class ReportSchemaHandler extends AbstractHandler<Void> implements AccessProtected +{ + @NotNull + private final SchemaReporter schemaReporter; + + /** + * Constructs a new instance of {@link ReportSchemaHandler} using the provided instances + * of {@link InstanceMetadataFetcher}, {@link ExecutorPools}, and {@link SchemaReporter} + * + * @param metadata the metadata fetcher + * @param executor executor pools for blocking executions + * @param reporter executor pools for blocking executions + */ + @Inject + public ReportSchemaHandler(@NotNull InstanceMetadataFetcher metadata, + @NotNull ExecutorPools executor, + @NotNull SchemaReporter reporter) + { + super(metadata, executor, null); + + schemaReporter = reporter; + } + + /** + * {@inheritDoc} + */ + @Override + @NotNull + public Set<Authorization> requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.REPORT_SCHEMA.toAuthorization()); + } + + /** + * {@inheritDoc} + */ + @Override + @Nullable + protected Void extractParamsOrThrow(@NotNull RoutingContext context) + { + return null; + } + + /** + * {@inheritDoc} + */ + @Override + protected void handleInternal(@NotNull RoutingContext context, + @NotNull HttpServerRequest http, + @NotNull String host, + @NotNull SocketAddress address, + @Nullable Void request) + { + executorPools.service() + .runBlocking(() -> metadataFetcher.runOnFirstAvailableInstance(instance -> + schemaReporter.process(instance.delegate().metadata()))) + .onSuccess(context::json) Review Comment: The previous block does not return any value. What does the response json look like? ########## server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Collections; +import java.util.Set; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.datahub.SchemaReporter; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * An implementation of {@link AbstractHandler} used to trigger an immediate, + * synchronous conversion and report of the current schema + */ +@Singleton +public class ReportSchemaHandler extends AbstractHandler<Void> implements AccessProtected +{ + @NotNull + private final SchemaReporter schemaReporter; + + /** + * Constructs a new instance of {@link ReportSchemaHandler} using the provided instances + * of {@link InstanceMetadataFetcher}, {@link ExecutorPools}, and {@link SchemaReporter} + * + * @param metadata the metadata fetcher + * @param executor executor pools for blocking executions + * @param reporter executor pools for blocking executions + */ + @Inject + public ReportSchemaHandler(@NotNull InstanceMetadataFetcher metadata, + @NotNull ExecutorPools executor, + @NotNull SchemaReporter reporter) + { + super(metadata, executor, null); + + schemaReporter = reporter; + } + + /** + * {@inheritDoc} + */ + @Override + @NotNull + public Set<Authorization> requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.REPORT_SCHEMA.toAuthorization()); + } + + /** + * {@inheritDoc} + */ + @Override + @Nullable + protected Void extractParamsOrThrow(@NotNull RoutingContext context) + { + return null; + } + + /** + * {@inheritDoc} + */ + @Override + protected void handleInternal(@NotNull RoutingContext context, + @NotNull HttpServerRequest http, + @NotNull String host, + @NotNull SocketAddress address, + @Nullable Void request) + { + executorPools.service() + .runBlocking(() -> metadataFetcher.runOnFirstAvailableInstance(instance -> + schemaReporter.process(instance.delegate().metadata()))) + .onSuccess(context::json) + .onFailure(throwable -> processFailure(throwable, context, host, address, request)); + } Review Comment: ```suggestion Metadata metadata = metadataFetcher.callOnFirstAvailableInstance(instance -> instance.delegate().metadata()); executorPools.service() .runBlocking(() -> { schemaReporter.process(metadata); }) .onSuccess(v -> context.json(OK_STATUS)) .onFailure(cause -> processFailure(cause, context, host, address, request)); ``` ########## server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Metadata; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import com.linkedin.mxe.MetadataChangeProposal; +import datahub.client.Callback; +import datahub.client.MetadataWriteResponse; +import datahub.shaded.findbugs.annotations.SuppressFBWarnings; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.utils.IOUtils; +import org.apache.cassandra.sidecar.datahub.EmitterFactory; +import org.apache.cassandra.sidecar.datahub.JsonEmitter; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ReportSchemaHandler} + */ +@ExtendWith(VertxExtension.class) +final class ReportSchemaHandlerTest +{ + private static final String CLUSTER = "cluster"; + private static final String DIRECTORY = "/tmp"; + private static final int IDENTIFIER = 42; + private static final String LOCALHOST = "127.0.0.1"; + private static final int PORT = 9042; + private static final String ENDPOINT = "/api/v1/report-schema"; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + + private static final class ThrowingEmitter extends JsonEmitter + { + @Override + @NotNull + public synchronized Future<MetadataWriteResponse> emit(@NotNull MetadataChangeProposal proposal, + @Nullable Callback callback) throws IOException + { + throw new IOException(); + } + } + + private final class ReportSchemaHandlerTestModule extends AbstractModule + { + @Provides + @Singleton + @NotNull + public InstancesMetadata instancesMetadata() + { + Metadata metadata = mock(Metadata.class); + when(metadata.getKeyspaces()).thenReturn(Collections.emptyList()); + + StorageOperations operations = mock(StorageOperations.class); + when(operations.clusterName()).thenReturn(CLUSTER); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + when(delegate.storageOperations()).thenReturn(operations); + when(delegate.metadata()).thenReturn(metadata); + + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.stagingDir()).thenReturn(DIRECTORY); + when(instanceMetadata.id()).thenReturn(IDENTIFIER); + when(instanceMetadata.host()).thenReturn(LOCALHOST); + when(instanceMetadata.port()).thenReturn(PORT); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata instances = mock(InstancesMetadata.class); + when(instances.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(instances.instanceFromId(IDENTIFIER)).thenReturn(instanceMetadata); + when(instances.instanceFromHost(LOCALHOST)).thenReturn(instanceMetadata); + return instances; + } + + @Provides + @Singleton + @NotNull + public EmitterFactory emitterFactory() + { + return () -> emitter; + } + } + + private final Injector injector = Guice.createInjector(Modules.override(new MainModule()).with( + Modules.override(new TestModule()).with( + new ReportSchemaHandlerTestModule()))); + private WebClient client; + private Server server; + private JsonEmitter emitter; + + @BeforeEach + void before() throws InterruptedException + { + client = WebClient.create(injector.getInstance(Vertx.class)); + server = injector.getInstance(Server.class); + + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(result -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @AfterEach + @SuppressWarnings("ResultOfMethodCallIgnored") + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED") + void after() throws InterruptedException + { + CountDownLatch latch = new CountDownLatch(1); + server.close() + .onSuccess(future -> latch.countDown()); + latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @Test + @SuppressWarnings("deprecation") + void testSuccess(@NotNull VertxTestContext context) throws IOException + { + String expected = IOUtils.readFully("/datahub/empty_cluster.json"); + emitter = new JsonEmitter(); + + client.get(server.actualPort(), LOCALHOST, ENDPOINT) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> + { + assertThat(response.statusCode()) + .isEqualTo(HttpResponseStatus.OK.code()); + assertThat(emitter.content()) + .isEqualTo(expected); + context.completeNow(); + })); + } + + @Test + @SuppressWarnings("deprecation") Review Comment: remove ########## server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Metadata; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import com.linkedin.mxe.MetadataChangeProposal; +import datahub.client.Callback; +import datahub.client.MetadataWriteResponse; +import datahub.shaded.findbugs.annotations.SuppressFBWarnings; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.utils.IOUtils; +import org.apache.cassandra.sidecar.datahub.EmitterFactory; +import org.apache.cassandra.sidecar.datahub.JsonEmitter; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ReportSchemaHandler} + */ +@ExtendWith(VertxExtension.class) +final class ReportSchemaHandlerTest +{ + private static final String CLUSTER = "cluster"; + private static final String DIRECTORY = "/tmp"; + private static final int IDENTIFIER = 42; + private static final String LOCALHOST = "127.0.0.1"; + private static final int PORT = 9042; + private static final String ENDPOINT = "/api/v1/report-schema"; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + + private static final class ThrowingEmitter extends JsonEmitter + { + @Override + @NotNull + public synchronized Future<MetadataWriteResponse> emit(@NotNull MetadataChangeProposal proposal, + @Nullable Callback callback) throws IOException + { + throw new IOException(); + } + } + + private final class ReportSchemaHandlerTestModule extends AbstractModule + { + @Provides + @Singleton + @NotNull + public InstancesMetadata instancesMetadata() + { + Metadata metadata = mock(Metadata.class); + when(metadata.getKeyspaces()).thenReturn(Collections.emptyList()); + + StorageOperations operations = mock(StorageOperations.class); + when(operations.clusterName()).thenReturn(CLUSTER); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + when(delegate.storageOperations()).thenReturn(operations); + when(delegate.metadata()).thenReturn(metadata); + + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.stagingDir()).thenReturn(DIRECTORY); + when(instanceMetadata.id()).thenReturn(IDENTIFIER); + when(instanceMetadata.host()).thenReturn(LOCALHOST); + when(instanceMetadata.port()).thenReturn(PORT); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata instances = mock(InstancesMetadata.class); + when(instances.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(instances.instanceFromId(IDENTIFIER)).thenReturn(instanceMetadata); + when(instances.instanceFromHost(LOCALHOST)).thenReturn(instanceMetadata); + return instances; + } + + @Provides + @Singleton + @NotNull + public EmitterFactory emitterFactory() + { + return () -> emitter; + } + } + + private final Injector injector = Guice.createInjector(Modules.override(new MainModule()).with( + Modules.override(new TestModule()).with( + new ReportSchemaHandlerTestModule()))); + private WebClient client; + private Server server; + private JsonEmitter emitter; + + @BeforeEach + void before() throws InterruptedException + { + client = WebClient.create(injector.getInstance(Vertx.class)); + server = injector.getInstance(Server.class); + + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(result -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @AfterEach + @SuppressWarnings("ResultOfMethodCallIgnored") + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED") + void after() throws InterruptedException + { + CountDownLatch latch = new CountDownLatch(1); + server.close() + .onSuccess(future -> latch.countDown()); + latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @Test + @SuppressWarnings("deprecation") Review Comment: remove ########## server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Metadata; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import com.linkedin.mxe.MetadataChangeProposal; +import datahub.client.Callback; +import datahub.client.MetadataWriteResponse; +import datahub.shaded.findbugs.annotations.SuppressFBWarnings; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.utils.IOUtils; +import org.apache.cassandra.sidecar.datahub.EmitterFactory; +import org.apache.cassandra.sidecar.datahub.JsonEmitter; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ReportSchemaHandler} + */ +@ExtendWith(VertxExtension.class) +final class ReportSchemaHandlerTest +{ + private static final String CLUSTER = "cluster"; + private static final String DIRECTORY = "/tmp"; + private static final int IDENTIFIER = 42; + private static final String LOCALHOST = "127.0.0.1"; + private static final int PORT = 9042; + private static final String ENDPOINT = "/api/v1/report-schema"; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + + private static final class ThrowingEmitter extends JsonEmitter + { + @Override + @NotNull + public synchronized Future<MetadataWriteResponse> emit(@NotNull MetadataChangeProposal proposal, + @Nullable Callback callback) throws IOException + { + throw new IOException(); + } + } + + private final class ReportSchemaHandlerTestModule extends AbstractModule + { + @Provides + @Singleton + @NotNull + public InstancesMetadata instancesMetadata() + { + Metadata metadata = mock(Metadata.class); + when(metadata.getKeyspaces()).thenReturn(Collections.emptyList()); + + StorageOperations operations = mock(StorageOperations.class); + when(operations.clusterName()).thenReturn(CLUSTER); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + when(delegate.storageOperations()).thenReturn(operations); + when(delegate.metadata()).thenReturn(metadata); + + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.stagingDir()).thenReturn(DIRECTORY); + when(instanceMetadata.id()).thenReturn(IDENTIFIER); + when(instanceMetadata.host()).thenReturn(LOCALHOST); + when(instanceMetadata.port()).thenReturn(PORT); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata instances = mock(InstancesMetadata.class); + when(instances.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(instances.instanceFromId(IDENTIFIER)).thenReturn(instanceMetadata); + when(instances.instanceFromHost(LOCALHOST)).thenReturn(instanceMetadata); + return instances; + } + + @Provides + @Singleton + @NotNull + public EmitterFactory emitterFactory() + { + return () -> emitter; + } + } + + private final Injector injector = Guice.createInjector(Modules.override(new MainModule()).with( + Modules.override(new TestModule()).with( + new ReportSchemaHandlerTestModule()))); + private WebClient client; + private Server server; + private JsonEmitter emitter; + + @BeforeEach + void before() throws InterruptedException + { + client = WebClient.create(injector.getInstance(Vertx.class)); + server = injector.getInstance(Server.class); + + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(result -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @AfterEach + @SuppressWarnings("ResultOfMethodCallIgnored") + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED") Review Comment: remove the unnecessary annotations. ########## server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Metadata; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import com.linkedin.mxe.MetadataChangeProposal; +import datahub.client.Callback; +import datahub.client.MetadataWriteResponse; +import datahub.shaded.findbugs.annotations.SuppressFBWarnings; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.utils.IOUtils; +import org.apache.cassandra.sidecar.datahub.EmitterFactory; +import org.apache.cassandra.sidecar.datahub.JsonEmitter; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ReportSchemaHandler} + */ +@ExtendWith(VertxExtension.class) +final class ReportSchemaHandlerTest +{ + private static final String CLUSTER = "cluster"; + private static final String DIRECTORY = "/tmp"; + private static final int IDENTIFIER = 42; + private static final String LOCALHOST = "127.0.0.1"; + private static final int PORT = 9042; + private static final String ENDPOINT = "/api/v1/report-schema"; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + + private static final class ThrowingEmitter extends JsonEmitter + { + @Override + @NotNull + public synchronized Future<MetadataWriteResponse> emit(@NotNull MetadataChangeProposal proposal, + @Nullable Callback callback) throws IOException + { + throw new IOException(); + } + } + + private final class ReportSchemaHandlerTestModule extends AbstractModule + { + @Provides + @Singleton + @NotNull + public InstancesMetadata instancesMetadata() + { + Metadata metadata = mock(Metadata.class); + when(metadata.getKeyspaces()).thenReturn(Collections.emptyList()); + + StorageOperations operations = mock(StorageOperations.class); + when(operations.clusterName()).thenReturn(CLUSTER); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + when(delegate.storageOperations()).thenReturn(operations); + when(delegate.metadata()).thenReturn(metadata); + + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.stagingDir()).thenReturn(DIRECTORY); + when(instanceMetadata.id()).thenReturn(IDENTIFIER); + when(instanceMetadata.host()).thenReturn(LOCALHOST); + when(instanceMetadata.port()).thenReturn(PORT); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata instances = mock(InstancesMetadata.class); + when(instances.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(instances.instanceFromId(IDENTIFIER)).thenReturn(instanceMetadata); + when(instances.instanceFromHost(LOCALHOST)).thenReturn(instanceMetadata); + return instances; + } + + @Provides + @Singleton + @NotNull + public EmitterFactory emitterFactory() + { + return () -> emitter; + } + } + + private final Injector injector = Guice.createInjector(Modules.override(new MainModule()).with( + Modules.override(new TestModule()).with( + new ReportSchemaHandlerTestModule()))); + private WebClient client; + private Server server; + private JsonEmitter emitter; + + @BeforeEach + void before() throws InterruptedException + { + client = WebClient.create(injector.getInstance(Vertx.class)); + server = injector.getInstance(Server.class); + + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(result -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @AfterEach + @SuppressWarnings("ResultOfMethodCallIgnored") + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED") + void after() throws InterruptedException + { + CountDownLatch latch = new CountDownLatch(1); + server.close() + .onSuccess(future -> latch.countDown()); + latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); Review Comment: close the client too. ########## server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Metadata; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import com.linkedin.mxe.MetadataChangeProposal; +import datahub.client.Callback; +import datahub.client.MetadataWriteResponse; +import datahub.shaded.findbugs.annotations.SuppressFBWarnings; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.utils.IOUtils; +import org.apache.cassandra.sidecar.datahub.EmitterFactory; +import org.apache.cassandra.sidecar.datahub.JsonEmitter; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ReportSchemaHandler} + */ +@ExtendWith(VertxExtension.class) +final class ReportSchemaHandlerTest +{ + private static final String CLUSTER = "cluster"; + private static final String DIRECTORY = "/tmp"; + private static final int IDENTIFIER = 42; + private static final String LOCALHOST = "127.0.0.1"; + private static final int PORT = 9042; + private static final String ENDPOINT = "/api/v1/report-schema"; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + + private static final class ThrowingEmitter extends JsonEmitter + { + @Override + @NotNull + public synchronized Future<MetadataWriteResponse> emit(@NotNull MetadataChangeProposal proposal, + @Nullable Callback callback) throws IOException + { + throw new IOException(); + } + } + + private final class ReportSchemaHandlerTestModule extends AbstractModule + { + @Provides + @Singleton + @NotNull + public InstancesMetadata instancesMetadata() + { + Metadata metadata = mock(Metadata.class); + when(metadata.getKeyspaces()).thenReturn(Collections.emptyList()); + + StorageOperations operations = mock(StorageOperations.class); + when(operations.clusterName()).thenReturn(CLUSTER); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + when(delegate.storageOperations()).thenReturn(operations); + when(delegate.metadata()).thenReturn(metadata); + + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.stagingDir()).thenReturn(DIRECTORY); + when(instanceMetadata.id()).thenReturn(IDENTIFIER); + when(instanceMetadata.host()).thenReturn(LOCALHOST); + when(instanceMetadata.port()).thenReturn(PORT); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata instances = mock(InstancesMetadata.class); + when(instances.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(instances.instanceFromId(IDENTIFIER)).thenReturn(instanceMetadata); + when(instances.instanceFromHost(LOCALHOST)).thenReturn(instanceMetadata); + return instances; + } + + @Provides + @Singleton + @NotNull + public EmitterFactory emitterFactory() + { + return () -> emitter; + } + } + + private final Injector injector = Guice.createInjector(Modules.override(new MainModule()).with( + Modules.override(new TestModule()).with( + new ReportSchemaHandlerTestModule()))); + private WebClient client; + private Server server; + private JsonEmitter emitter; + + @BeforeEach + void before() throws InterruptedException + { + client = WebClient.create(injector.getInstance(Vertx.class)); + server = injector.getInstance(Server.class); + + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(result -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @AfterEach + @SuppressWarnings("ResultOfMethodCallIgnored") + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED") + void after() throws InterruptedException + { + CountDownLatch latch = new CountDownLatch(1); + server.close() + .onSuccess(future -> latch.countDown()); + latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @Test + @SuppressWarnings("deprecation") + void testSuccess(@NotNull VertxTestContext context) throws IOException + { + String expected = IOUtils.readFully("/datahub/empty_cluster.json"); + emitter = new JsonEmitter(); + + client.get(server.actualPort(), LOCALHOST, ENDPOINT) Review Comment: add an assertion that `emitter.content` is empty, before making the http request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org