This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-it.git
The following commit(s) were added to refs/heads/master by this push: new 45a9fbd SLING-9472 - Check queue state via http to author 45a9fbd is described below commit 45a9fbda740f3025cbc094f486ce86f2f9dfd22a Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Tue Jun 30 11:41:21 2020 +0200 SLING-9472 - Check queue state via http to author --- .../sling/distribution/journal/it/Client.java | 132 +++++++++++++++++++++ .../journal/it/DistributionTestBase.java | 94 +-------------- .../journal/it/DistributionTestSupport.java | 12 +- .../journal/it/tests/PublisherReceiveTest.java | 13 +- .../it/tests/StagedDistributionFailureTest.java | 35 +++--- .../journal/it/tests/StagedDistributionTest.java | 23 ++-- src/test/resources/logback.xml | 2 + 7 files changed, 175 insertions(+), 136 deletions(-) diff --git a/src/test/java/org/apache/sling/distribution/journal/it/Client.java b/src/test/java/org/apache/sling/distribution/journal/it/Client.java new file mode 100644 index 0000000..f85009a --- /dev/null +++ b/src/test/java/org/apache/sling/distribution/journal/it/Client.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.it; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.equalTo; + +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +public class Client { + private static final String ADMIN_USER = "admin"; + private static final String ADMIN_PASSWORD = "admin"; + private static final int TIMEOUT_SECONDS = 60; + + protected static Logger LOG = LoggerFactory.getLogger(Client.class); + + public static void waitNumQueues(int num) { + await("Waiting for " + num + " queues to be present") + .pollInterval(1, TimeUnit.SECONDS) + .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) + .until(Client::numQueues, equalTo(num)); + } + + public static void waitSumQueueSizes(int num) { + await("Wait for all queues to have size sum " + num) + .pollInterval(1, TimeUnit.SECONDS) + .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) + .ignoreExceptions() + .until(Client::sumQueueSizes, equalTo(num)); + } + + public static int numQueues() { + return getQueues(DistributionTestBase.PUB1_AGENT).size(); + } + + public static int sumQueueSizes() { + Map<String, Integer> queues = Client.getQueues(DistributionTestBase.PUB1_AGENT); + + return queues.values().stream().mapToInt(Integer::valueOf).sum(); + } + + public static Map<String, Integer> getQueues(String agentName) { + String uri = String.format("http://localhost:%s/libs/sling/distribution/services/agents/%s/queues.2.json", + 8181, + agentName); + JsonObject root = getJson(uri).getAsJsonObject(); + Map<String, Integer> queues = new HashMap<>(); + root.entrySet().stream() + .filter(entry -> !entry.getKey().equals("items")) + .filter(entry -> !entry.getKey().equals("sling:resourceType")) + .forEach(entry -> { + queues.put(entry.getKey(), getItemsCount(entry)); + }); + LOG.info("Queue sizes {}", queues); + return queues; + } + + private static int getItemsCount(Entry<String, JsonElement> entry) { + return entry.getValue().getAsJsonObject().get("itemsCount").getAsInt(); + } + + public static List<String> getSubNodes(String uri) { + LOG.info("Trying to get queue from {}", uri); + JsonElement root = getJson(uri); + List<String> result = new ArrayList<>(); + JsonArray items = root.getAsJsonObject().get("items").getAsJsonArray(); + items.forEach(item -> result.add(item.getAsString())); + return result; + } + + private static JsonElement getJson(String uri) { + try (CloseableHttpClient client = createHttpClient()) { + HttpGet httpGet = new HttpGet(uri); + CloseableHttpResponse response = client.execute(httpGet); + InputStreamReader reader = new InputStreamReader(response.getEntity().getContent(), Charset.forName("utf-8")); + return new JsonParser().parse(reader); + } catch (Exception e) { + throw new RuntimeException("Cannot get json for uri " + uri, e); + } + } + + protected static CloseableHttpClient createHttpClient() { + return HttpClientBuilder.create() + .setDefaultCredentialsProvider(credentialsProvider()).build(); + } + + private static CredentialsProvider credentialsProvider() { + CredentialsProvider provider = new BasicCredentialsProvider(); + UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(ADMIN_USER, ADMIN_PASSWORD); + provider.setCredentials(AuthScope.ANY, credentials); + return provider; + } +} diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java index 01464e2..209e98d 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java @@ -19,24 +19,18 @@ package org.apache.sling.distribution.journal.it; import static org.awaitility.Awaitility.await; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration; import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; import javax.inject.Inject; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.methods.CloseableHttpResponse; @@ -53,12 +47,8 @@ import org.apache.sling.distribution.DistributionRequestType; import org.apache.sling.distribution.DistributionResponse; import org.apache.sling.distribution.Distributor; import org.apache.sling.distribution.SimpleDistributionRequest; -import org.apache.sling.distribution.agent.spi.DistributionAgent; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.it.kafka.KafkaLocal; -import org.awaitility.Duration; -import org.hamcrest.Matcher; -import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.ops4j.pax.exam.Configuration; @@ -78,11 +68,7 @@ public class DistributionTestBase extends DistributionTestSupport { private static KafkaLocal kafka; private static final String RESOURCE_TYPE = "sling:Folder"; - private static final String PUB1_AGENT = "agent1"; - - @Inject - @Filter(value = "(name=agent1)", timeout = 40000L) - DistributionAgent agent; + public static final String PUB1_AGENT = "agent1"; @Inject @Filter @@ -127,14 +113,14 @@ public class DistributionTestBase extends DistributionTestSupport { } - public static TestContainer startPublishInstance(int httpPort, String agentName, boolean editable, String stageAgentName) { + public static TestContainer startPublishInstance(int httpPort, String agentName, boolean editable, boolean stagingPrecondition) { ExamSystem testSystem; try { String workdir = String.format("%s/target/paxexam/%s", PathUtils.getBaseDir(), "publish-" + httpPort + "-" + UUID.randomUUID().toString()); Option[] config = CoreOptions.options( // new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir), // defaultOsgiConfigs(), // - publishOsgiConfigs(agentName, editable, stageAgentName), // + publishOsgiConfigs(agentName, editable, stagingPrecondition), // CoreOptions.workingDirectory(workdir) ); @@ -144,6 +130,7 @@ public class DistributionTestBase extends DistributionTestSupport { } TestContainer container = PaxExamRuntime.createContainer(testSystem); container.start(); + LOG.info("Container with port {} started.", httpPort); return container; } @@ -162,16 +149,6 @@ public class DistributionTestBase extends DistributionTestSupport { return response.isSuccessful(); } - private List<String> queueNames() { - List<String> queueNames = new ArrayList<>(); - agent.getQueueNames().forEach(queueNames::add); - return queueNames; - } - - protected int getQueueItems(String queueName) { - return agent.getQueue(queueName).getStatus().getItemsCount(); - } - @SuppressWarnings({ "deprecation" }) private ResourceResolver createResolver() { try { @@ -215,67 +192,4 @@ public class DistributionTestBase extends DistributionTestSupport { .until(() -> tryGetPath(httpPort, path), equalTo(200)); } - public Iterable<String> waitSubQueues(String... queues) { - Matcher<String>[] matchers = containsNames(queues); - Iterable<String> queueNames = await().atMost(Duration.ONE_MINUTE) - .pollInterval(Duration.FIVE_SECONDS) - .until(this::queueNames, containsInAnyOrder(matchers)); - LOG.info("Subscriber Queues: " + String.join(", ", queueNames)); - return queueNames; - } - - @SuppressWarnings("unchecked") - private Matcher<String>[] containsNames(String... queues) { - return Stream.of(queues) - .map(name -> Matchers.containsString(name)) - .toArray(Matcher[]::new); - } - - protected void waitEmptySubQueues() { - List<String> names = queueNames(); - for (String name : names) { - await("Queue " + name + "empty") - .atMost(60, TimeUnit.SECONDS) - .until(() -> getQueueItems(name), equalTo(0)); - } - } - - - static protected void waitQueueItems(int httpPort, String agentName, int count) { - await("Waiting for number of items in queue.") - .atMost(Duration.ONE_MINUTE) - .pollInterval(Duration.FIVE_SECONDS) - .until(() -> tryGetQueueItems(httpPort, agentName), equalTo(count)); - LOG.info("Items count {} for agent {}", count, agentName + "-" + httpPort); - - } - - static private int tryGetQueueItems(int httpPort, String agentName) { - String url = String.format("http://localhost:%s/libs/sling/distribution/services/agents/%s/queues.2.json", httpPort, agentName); - HttpGet httpGet = new HttpGet(url); - Header authHeader = null; - try (CloseableHttpClient client = HttpClients.createDefault()) { - authHeader = new BasicScheme().authenticate(new UsernamePasswordCredentials("admin", "admin"), httpGet, null); - httpGet.addHeader(authHeader); - - - CloseableHttpResponse response = client.execute(httpGet); - String text = IOUtils.toString(response.getEntity().getContent(), Charset.defaultCharset()); - if (text == null) { - return -1; - } - - String itemsCount = StringUtils.substringBetween(text, "itemsCount\":", ","); - if (itemsCount == null) { - return -1; - } - - return Integer.parseInt(itemsCount.trim()); - } catch (Throwable e) { - LOG.error("cannot get items count {}", url, e); - } - return -1; - } - - } diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java index f26f54f..545ddcc 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java @@ -118,6 +118,7 @@ public class DistributionTestSupport extends TestSupport { paxTinybundles(), SlingOptions.logback(), mavenBundle().groupId("org.slf4j").artifactId("log4j-over-slf4j").version(SlingOptions.versionResolver), + mvn("com.google.code.gson", "gson"), // The base sling Quickstart slingQuickstart(baseDirectory), @@ -298,13 +299,10 @@ public class DistributionTestSupport extends TestSupport { * OSGI configuration targeted to the publish instances only */ public static Option publishOsgiConfigs(String agentName) { - return publishOsgiConfigs(agentName, true, null); - + return publishOsgiConfigs(agentName, true, false); } - protected static Option publishOsgiConfigs(String agentName, boolean editable, String stage) { - - + protected static Option publishOsgiConfigs(String agentName, boolean editable, boolean stagingPrecondition) { return composite( factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory") .put("kind", "agent") @@ -313,9 +311,9 @@ public class DistributionTestSupport extends TestSupport { factoryConfiguration("org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory") .put("name", agentName) - .put("agentNames", new String[]{"agent1"}) + .put("agentNames", new String[]{ "agent1"}) .put("packageBuilder.target", "(name=journal)") - .put("precondition.target", stage != null ? "(name=staging)" : "(name=default)") + .put("precondition.target", stagingPrecondition ? "(name=staging)" : "(name=default)") .put("editable", editable) .put("announceDelay", "500") .asOption()); diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java index e1930c9..9d4e2ed 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java @@ -41,7 +41,6 @@ import org.apache.sling.api.resource.ResourceUtil; import org.apache.sling.distribution.DistributionRequest; import org.apache.sling.distribution.DistributionRequestType; import org.apache.sling.distribution.SimpleDistributionRequest; -import org.apache.sling.distribution.agent.spi.DistributionAgent; import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.it.DistributionTestSupport; @@ -81,15 +80,7 @@ public class PublisherReceiveTest extends DistributionTestSupport { private DistributionPackageBuilder packageBuilder; @Inject - @Filter("(name=subscriber-agent1)") - DistributionAgent subscriber; - - @Inject MessagingProvider provider; - /* - @Inject - ServiceUserMapper serviceUserMapper; - */ @Configuration public Option[] configuration() { @@ -109,8 +100,8 @@ public class PublisherReceiveTest extends DistributionTestSupport { @Test public void testReceive() throws Exception { - Arrays.asList(bundleContext.getBundles()).stream() - .forEach(bundle -> log.info(bundle.getSymbolicName() + ":" + bundle.getVersion())); + Arrays.asList(bundleContext.getBundles()).stream() + .forEach(bundle -> log.info(bundle.getSymbolicName() + ":" + bundle.getVersion())); DistributionPackage pkg = createDistPackage(RESOURCE_PATH); PackageMessage pkgMsg = toPackageMessage(pkg, "agent1"); provider.createSender(TOPIC_PACKAGE).send(pkgMsg); diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java index 600eb0c..28756df 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java @@ -18,6 +18,9 @@ */ package org.apache.sling.distribution.journal.it.tests; +import java.io.IOException; + +import org.apache.sling.distribution.journal.it.Client; import org.apache.sling.distribution.journal.it.DistributionTestBase; import org.apache.sling.distribution.journal.it.ext.AfterOsgi; import org.apache.sling.distribution.journal.it.ext.BeforeOsgi; @@ -29,38 +32,38 @@ import org.ops4j.pax.exam.TestContainer; import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy; import org.ops4j.pax.exam.spi.reactors.PerClass; -import java.io.IOException; - @RunWith(ExtPaxExam.class) @ExamReactorStrategy(PerClass.class) public class StagedDistributionFailureTest extends DistributionTestBase { - private static final String SUB1_AGENT = "subscriber"; - private static final String SUB2_AGENT = "subscriber"; - + private static final String SUB1_AGENT = PUB1_AGENT + "Subscriber"; + private static final String SUB2_AGENT = PUB1_AGENT + "Subscriber"; private static volatile TestContainer publish; private static volatile TestContainer golden_publish; - private static final String TEST_PATH = "/content/mytest"; @BeforeOsgi public static void beforeOsgi() throws Exception { beforeOsgiBase(); - publish = startPublishInstance(8182, SUB1_AGENT, false, SUB2_AGENT); + publish = startPublishInstance(8182, SUB1_AGENT, false, true); new Thread(StagedDistributionFailureTest::delayedStartGoldenSubscriber).start(); } - + /** * Wait for at least one item in publish queue before starting golden publish */ private static void delayedStartGoldenSubscriber() { - waitQueueItems(8182, SUB1_AGENT, 1); - LOG.info("Starting golden publish"); - golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null); + try { + Client.waitSumQueueSizes(1); + LOG.info("Starting golden publish"); + golden_publish = startPublishInstance(8183, SUB2_AGENT, true, false); + } catch (Exception e) { + LOG.error("Start of golden subscriber failed with: " + e.getMessage(), e); + } } @AfterOsgi @@ -78,8 +81,7 @@ public class StagedDistributionFailureTest extends DistributionTestBase { @Before public void before() { createPath(TEST_PATH); - - waitSubQueues(SUB1_AGENT); + Client.waitNumQueues(1); } /** @@ -90,13 +92,14 @@ public class StagedDistributionFailureTest extends DistributionTestBase { */ @Test public void testDistribute() { - distribute(TEST_PATH); - waitSubQueues(SUB1_AGENT, SUB2_AGENT); - waitEmptySubQueues(); + Client.waitNumQueues(2); + Client.waitSumQueueSizes(0); waitPath(8182, TEST_PATH); waitPath(8183, TEST_PATH); } + + } diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java index bbd103f..f47f027 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java @@ -18,28 +18,28 @@ */ package org.apache.sling.distribution.journal.it.tests; +import java.io.IOException; + +import org.apache.sling.distribution.journal.it.Client; import org.apache.sling.distribution.journal.it.DistributionTestBase; import org.apache.sling.distribution.journal.it.ext.AfterOsgi; import org.apache.sling.distribution.journal.it.ext.BeforeOsgi; import org.apache.sling.distribution.journal.it.ext.ExtPaxExam; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.ops4j.pax.exam.TestContainer; import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy; import org.ops4j.pax.exam.spi.reactors.PerClass; -import java.io.IOException; - -@Ignore(value = "Switched off as this test does not seem to work on jenkins. Locally it works.") +//@Ignore(value = "Switched off as this test does not seem to work on jenkins. Locally it works.") @RunWith(ExtPaxExam.class) @ExamReactorStrategy(PerClass.class) public class StagedDistributionTest extends DistributionTestBase { - private static final String SUB1_AGENT = "subscriber-regular"; - private static final String SUB2_AGENT = "subscriber-golden"; - + private static final String SUB1_AGENT = PUB1_AGENT + "Subscriber"; + private static final String SUB2_AGENT = PUB1_AGENT + "Subscriber"; + private static TestContainer golden_publish; private static TestContainer publish; @@ -50,8 +50,8 @@ public class StagedDistributionTest extends DistributionTestBase { @BeforeOsgi public static void beforeOsgi() throws Exception { beforeOsgiBase(); - publish = startPublishInstance(8182, SUB1_AGENT, false, SUB2_AGENT); - golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null); + publish = startPublishInstance(8182, SUB1_AGENT, false, true); + golden_publish = startPublishInstance(8183, SUB2_AGENT, true, false); } @@ -69,8 +69,7 @@ public class StagedDistributionTest extends DistributionTestBase { @Before public void before() { createPath(TEST_PATH); - - waitSubQueues(SUB1_AGENT, SUB2_AGENT); + Client.waitNumQueues(2); } @Test @@ -78,7 +77,7 @@ public class StagedDistributionTest extends DistributionTestBase { distribute(TEST_PATH); - waitEmptySubQueues(); + Client.waitSumQueueSizes(0); waitPath(8182, TEST_PATH); waitPath(8183, TEST_PATH); diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index 5f21dd4..57f012c 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -28,7 +28,9 @@ <appender-ref ref="console"/> </root> + <logger name="kafka" level="WARN"/> + <logger name="org.apache.sling.distribution.journal.kafka.KafkaJsonMessageSender" level="WARN"/> <logger name="org.apache.zookeeper" level="WARN"/> <logger name="org.apache.kafka.clients" level="WARN"/> <logger name="org.apache.sling.jcr.repoinit.impl" level="WARN"/>