This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-it.git


The following commit(s) were added to refs/heads/master by this push:
     new 45a9fbd  SLING-9472 - Check queue state via http to author
45a9fbd is described below

commit 45a9fbda740f3025cbc094f486ce86f2f9dfd22a
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Tue Jun 30 11:41:21 2020 +0200

    SLING-9472 - Check queue state via http to author
---
 .../sling/distribution/journal/it/Client.java      | 132 +++++++++++++++++++++
 .../journal/it/DistributionTestBase.java           |  94 +--------------
 .../journal/it/DistributionTestSupport.java        |  12 +-
 .../journal/it/tests/PublisherReceiveTest.java     |  13 +-
 .../it/tests/StagedDistributionFailureTest.java    |  35 +++---
 .../journal/it/tests/StagedDistributionTest.java   |  23 ++--
 src/test/resources/logback.xml                     |   2 +
 7 files changed, 175 insertions(+), 136 deletions(-)

diff --git a/src/test/java/org/apache/sling/distribution/journal/it/Client.java 
b/src/test/java/org/apache/sling/distribution/journal/it/Client.java
new file mode 100644
index 0000000..f85009a
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/Client.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it;
+
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+public class Client {
+    private static final String ADMIN_USER = "admin";
+    private static final String ADMIN_PASSWORD = "admin";
+    private static final int TIMEOUT_SECONDS = 60; 
+    
+    protected static Logger LOG = LoggerFactory.getLogger(Client.class);
+    
+    public static void waitNumQueues(int num) {
+        await("Waiting for " + num + " queues to be present")
+            .pollInterval(1,  TimeUnit.SECONDS)
+            .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+            .until(Client::numQueues, equalTo(num));
+    }
+    
+    public static void waitSumQueueSizes(int num) {
+        await("Wait for all queues to have size sum " + num)
+        .pollInterval(1, TimeUnit.SECONDS)
+        .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+        .ignoreExceptions()
+        .until(Client::sumQueueSizes, equalTo(num));
+    }
+    
+    public static int numQueues() {
+        return getQueues(DistributionTestBase.PUB1_AGENT).size();
+    }
+    
+    public static int sumQueueSizes() {
+        Map<String, Integer> queues = 
Client.getQueues(DistributionTestBase.PUB1_AGENT);
+        
+        return queues.values().stream().mapToInt(Integer::valueOf).sum();
+    }
+
+    public static Map<String, Integer> getQueues(String agentName) {
+        String uri = 
String.format("http://localhost:%s/libs/sling/distribution/services/agents/%s/queues.2.json";,
 
+                8181, 
+                agentName);
+        JsonObject root = getJson(uri).getAsJsonObject();
+        Map<String, Integer> queues = new HashMap<>();
+        root.entrySet().stream()
+            .filter(entry -> !entry.getKey().equals("items"))
+            .filter(entry -> !entry.getKey().equals("sling:resourceType"))
+            .forEach(entry -> {
+                queues.put(entry.getKey(), getItemsCount(entry));
+            });
+        LOG.info("Queue sizes {}", queues);
+        return queues;
+    }
+    
+    private static int getItemsCount(Entry<String, JsonElement> entry) {
+        return entry.getValue().getAsJsonObject().get("itemsCount").getAsInt();
+    }
+    
+    public static List<String> getSubNodes(String uri) {
+        LOG.info("Trying to get queue from {}", uri);
+        JsonElement root = getJson(uri);
+        List<String> result = new ArrayList<>();
+        JsonArray items = root.getAsJsonObject().get("items").getAsJsonArray();
+        items.forEach(item -> result.add(item.getAsString()));
+        return result;
+    }
+    
+    private static JsonElement getJson(String uri) {
+        try (CloseableHttpClient client =  createHttpClient()) {
+            HttpGet httpGet = new HttpGet(uri);
+            CloseableHttpResponse response = client.execute(httpGet);
+            InputStreamReader reader = new 
InputStreamReader(response.getEntity().getContent(), Charset.forName("utf-8"));
+            return new JsonParser().parse(reader);
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot get json for uri " + uri, e);
+        }
+    }
+
+    protected static CloseableHttpClient createHttpClient() {
+        return HttpClientBuilder.create()
+                .setDefaultCredentialsProvider(credentialsProvider()).build();
+    }
+
+    private static CredentialsProvider credentialsProvider() {
+        CredentialsProvider provider = new BasicCredentialsProvider();
+        UsernamePasswordCredentials credentials = new 
UsernamePasswordCredentials(ADMIN_USER, ADMIN_PASSWORD);
+        provider.setCredentials(AuthScope.ANY, credentials);
+        return provider;
+    }
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
 
b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
index 01464e2..209e98d 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
@@ -19,24 +19,18 @@
 package org.apache.sling.distribution.journal.it;
 
 import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -53,12 +47,8 @@ import org.apache.sling.distribution.DistributionRequestType;
 import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.Distributor;
 import org.apache.sling.distribution.SimpleDistributionRequest;
-import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
-import org.awaitility.Duration;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
 import org.ops4j.pax.exam.Configuration;
@@ -78,11 +68,7 @@ public class DistributionTestBase extends 
DistributionTestSupport {
     private static KafkaLocal kafka;
 
     private static final String RESOURCE_TYPE = "sling:Folder";
-    private static final String PUB1_AGENT = "agent1";
-
-    @Inject
-    @Filter(value = "(name=agent1)", timeout = 40000L)
-    DistributionAgent agent;
+    public static final String PUB1_AGENT = "agent1";
 
     @Inject
     @Filter
@@ -127,14 +113,14 @@ public class DistributionTestBase extends 
DistributionTestSupport {
     }
 
 
-    public static TestContainer startPublishInstance(int httpPort, String 
agentName, boolean editable, String stageAgentName) {
+    public static TestContainer startPublishInstance(int httpPort, String 
agentName, boolean editable, boolean stagingPrecondition) {
         ExamSystem testSystem;
         try {
             String workdir = String.format("%s/target/paxexam/%s", 
PathUtils.getBaseDir(), "publish-" + httpPort + "-" + 
UUID.randomUUID().toString());
             Option[] config = CoreOptions.options( //
                     new 
DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir), //
                     defaultOsgiConfigs(), //
-                    publishOsgiConfigs(agentName, editable, stageAgentName), //
+                    publishOsgiConfigs(agentName, editable, 
stagingPrecondition), //
                     CoreOptions.workingDirectory(workdir)
             );
 
@@ -144,6 +130,7 @@ public class DistributionTestBase extends 
DistributionTestSupport {
         }
         TestContainer container = PaxExamRuntime.createContainer(testSystem);
         container.start();
+        LOG.info("Container with port {} started.", httpPort);
         return container;
     }
 
@@ -162,16 +149,6 @@ public class DistributionTestBase extends 
DistributionTestSupport {
         return response.isSuccessful();
     }
 
-    private List<String> queueNames() {
-        List<String> queueNames = new ArrayList<>();
-        agent.getQueueNames().forEach(queueNames::add);
-        return queueNames;
-    }
-
-    protected int getQueueItems(String queueName) {
-        return agent.getQueue(queueName).getStatus().getItemsCount();
-    }
-
     @SuppressWarnings({ "deprecation" })
     private ResourceResolver createResolver() {
         try {
@@ -215,67 +192,4 @@ public class DistributionTestBase extends 
DistributionTestSupport {
             .until(() -> tryGetPath(httpPort, path), equalTo(200));
     }
 
-    public Iterable<String> waitSubQueues(String... queues) {
-        Matcher<String>[] matchers = containsNames(queues);
-        Iterable<String> queueNames = await().atMost(Duration.ONE_MINUTE)
-            .pollInterval(Duration.FIVE_SECONDS)
-            .until(this::queueNames, containsInAnyOrder(matchers));
-        LOG.info("Subscriber Queues: " + String.join(", ", queueNames));
-        return queueNames;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Matcher<String>[] containsNames(String... queues) {
-        return Stream.of(queues)
-                .map(name -> Matchers.containsString(name))
-                .toArray(Matcher[]::new);
-    }
-
-    protected void waitEmptySubQueues() {
-        List<String> names = queueNames();
-        for (String name : names) {
-            await("Queue " + name + "empty")
-                .atMost(60, TimeUnit.SECONDS)
-                .until(() -> getQueueItems(name), equalTo(0));
-        }
-    }
-
-
-    static protected void waitQueueItems(int httpPort, String agentName, int 
count) {
-        await("Waiting for number of items in queue.")
-                .atMost(Duration.ONE_MINUTE)
-                .pollInterval(Duration.FIVE_SECONDS)
-                .until(() -> tryGetQueueItems(httpPort, agentName), 
equalTo(count));
-        LOG.info("Items count {} for agent {}", count, agentName + "-" + 
httpPort);
-
-    }
-
-    static private int tryGetQueueItems(int httpPort, String agentName) {
-        String url = 
String.format("http://localhost:%s/libs/sling/distribution/services/agents/%s/queues.2.json";,
 httpPort, agentName);
-        HttpGet httpGet = new HttpGet(url);
-        Header authHeader = null;
-        try (CloseableHttpClient client = HttpClients.createDefault()) {
-            authHeader = new BasicScheme().authenticate(new 
UsernamePasswordCredentials("admin", "admin"), httpGet, null);
-            httpGet.addHeader(authHeader);
-
-
-            CloseableHttpResponse response = client.execute(httpGet);
-            String text = IOUtils.toString(response.getEntity().getContent(), 
Charset.defaultCharset());
-            if (text == null) {
-                return -1;
-            }
-
-            String itemsCount = StringUtils.substringBetween(text, 
"itemsCount\":", ",");
-            if (itemsCount == null) {
-                return -1;
-            }
-
-            return Integer.parseInt(itemsCount.trim());
-        } catch (Throwable e) {
-            LOG.error("cannot get items count {}", url, e);
-        }
-        return  -1;
-    }
-
-
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
 
b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
index f26f54f..545ddcc 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
@@ -118,6 +118,7 @@ public class DistributionTestSupport extends TestSupport {
                 paxTinybundles(),
                 SlingOptions.logback(),
                 
mavenBundle().groupId("org.slf4j").artifactId("log4j-over-slf4j").version(SlingOptions.versionResolver),
+                mvn("com.google.code.gson", "gson"),
                 
                 // The base sling Quickstart
                 slingQuickstart(baseDirectory),
@@ -298,13 +299,10 @@ public class DistributionTestSupport extends TestSupport {
      * OSGI configuration targeted to the publish instances only
      */
     public static Option publishOsgiConfigs(String agentName) {
-        return publishOsgiConfigs(agentName, true, null);
-
+        return publishOsgiConfigs(agentName, true, false);
     }
 
-    protected static Option publishOsgiConfigs(String agentName, boolean 
editable, String stage) {
-
-
+    protected static Option publishOsgiConfigs(String agentName, boolean 
editable, boolean stagingPrecondition) {
         return composite(
                 
factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
                         .put("kind", "agent")
@@ -313,9 +311,9 @@ public class DistributionTestSupport extends TestSupport {
 
                 
factoryConfiguration("org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
                         .put("name", agentName)
-                        .put("agentNames", new String[]{"agent1"})
+                        .put("agentNames", new String[]{ "agent1"})
                         .put("packageBuilder.target", "(name=journal)")
-                        .put("precondition.target",  stage != null ? 
"(name=staging)" : "(name=default)")
+                        .put("precondition.target",  stagingPrecondition ? 
"(name=staging)" : "(name=default)")
                         .put("editable", editable)
                         .put("announceDelay", "500")
                         .asOption());
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
index e1930c9..9d4e2ed 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
@@ -41,7 +41,6 @@ import org.apache.sling.api.resource.ResourceUtil;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestType;
 import org.apache.sling.distribution.SimpleDistributionRequest;
-import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.it.DistributionTestSupport;
@@ -81,15 +80,7 @@ public class PublisherReceiveTest extends 
DistributionTestSupport {
     private DistributionPackageBuilder packageBuilder;
     
     @Inject
-    @Filter("(name=subscriber-agent1)")
-    DistributionAgent subscriber;
-    
-    @Inject
     MessagingProvider provider;
-    /*
-    @Inject
-    ServiceUserMapper serviceUserMapper;
-    */
     
     @Configuration
     public Option[] configuration() {
@@ -109,8 +100,8 @@ public class PublisherReceiveTest extends 
DistributionTestSupport {
     
     @Test
     public void testReceive() throws Exception {
-       Arrays.asList(bundleContext.getBundles()).stream()
-       .forEach(bundle -> log.info(bundle.getSymbolicName() + ":" + 
bundle.getVersion()));
+        Arrays.asList(bundleContext.getBundles()).stream()
+            .forEach(bundle -> log.info(bundle.getSymbolicName() + ":" + 
bundle.getVersion()));
         DistributionPackage pkg = createDistPackage(RESOURCE_PATH);
         PackageMessage pkgMsg = toPackageMessage(pkg, "agent1");
         provider.createSender(TOPIC_PACKAGE).send(pkgMsg);
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
index 600eb0c..28756df 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.sling.distribution.journal.it.tests;
 
+import java.io.IOException;
+
+import org.apache.sling.distribution.journal.it.Client;
 import org.apache.sling.distribution.journal.it.DistributionTestBase;
 import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
 import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
@@ -29,38 +32,38 @@ import org.ops4j.pax.exam.TestContainer;
 import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
 import org.ops4j.pax.exam.spi.reactors.PerClass;
 
-import java.io.IOException;
-
 
 @RunWith(ExtPaxExam.class)
 @ExamReactorStrategy(PerClass.class)
 public class StagedDistributionFailureTest extends DistributionTestBase {
 
-    private static final String SUB1_AGENT = "subscriber";
-    private static final String SUB2_AGENT = "subscriber";
-
+    private static final String SUB1_AGENT = PUB1_AGENT + "Subscriber";
+    private static final String SUB2_AGENT = PUB1_AGENT + "Subscriber";
 
     private static volatile TestContainer publish;
     private static volatile TestContainer golden_publish;
 
-
     private static final String TEST_PATH = "/content/mytest";
 
 
     @BeforeOsgi
     public static void beforeOsgi() throws Exception {
         beforeOsgiBase();
-        publish = startPublishInstance(8182,  SUB1_AGENT, false,  SUB2_AGENT);
+        publish = startPublishInstance(8182,  SUB1_AGENT, false,  true);
         new 
Thread(StagedDistributionFailureTest::delayedStartGoldenSubscriber).start();
     }
-
+    
     /**
      * Wait for  at least one item in publish queue before starting golden 
publish
      */
     private static void delayedStartGoldenSubscriber() {
-        waitQueueItems(8182, SUB1_AGENT, 1);
-        LOG.info("Starting golden publish");
-        golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
+        try {
+            Client.waitSumQueueSizes(1);
+            LOG.info("Starting golden publish");
+            golden_publish = startPublishInstance(8183, SUB2_AGENT, true, 
false);
+        } catch (Exception e) {
+            LOG.error("Start of golden subscriber failed with: " + 
e.getMessage(), e);
+        }
     }
 
     @AfterOsgi
@@ -78,8 +81,7 @@ public class StagedDistributionFailureTest extends 
DistributionTestBase {
     @Before
     public void before() {
         createPath(TEST_PATH);
-
-        waitSubQueues(SUB1_AGENT);
+        Client.waitNumQueues(1);
     }
 
     /**
@@ -90,13 +92,14 @@ public class StagedDistributionFailureTest extends 
DistributionTestBase {
      */
     @Test
     public void testDistribute() {
-
         distribute(TEST_PATH);
 
-        waitSubQueues(SUB1_AGENT, SUB2_AGENT);
-        waitEmptySubQueues();
+        Client.waitNumQueues(2);
+        Client.waitSumQueueSizes(0);
 
         waitPath(8182, TEST_PATH);
         waitPath(8183, TEST_PATH);
     }
+
+
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
index bbd103f..f47f027 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
@@ -18,28 +18,28 @@
  */
 package org.apache.sling.distribution.journal.it.tests;
 
+import java.io.IOException;
+
+import org.apache.sling.distribution.journal.it.Client;
 import org.apache.sling.distribution.journal.it.DistributionTestBase;
 import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
 import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
 import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.TestContainer;
 import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
 import org.ops4j.pax.exam.spi.reactors.PerClass;
 
-import java.io.IOException;
-
-@Ignore(value = "Switched off as this test does not seem to work on jenkins. 
Locally it works.")
+//@Ignore(value = "Switched off as this test does not seem to work on jenkins. 
Locally it works.")
 @RunWith(ExtPaxExam.class)
 @ExamReactorStrategy(PerClass.class)
 public class StagedDistributionTest extends DistributionTestBase {
 
-    private static final String SUB1_AGENT = "subscriber-regular";
-    private static final String SUB2_AGENT = "subscriber-golden";
-
+    private static final String SUB1_AGENT = PUB1_AGENT + "Subscriber";
+    private static final String SUB2_AGENT = PUB1_AGENT + "Subscriber";
+    
     private static TestContainer golden_publish;
     private static TestContainer publish;
 
@@ -50,8 +50,8 @@ public class StagedDistributionTest extends 
DistributionTestBase {
     @BeforeOsgi
     public static void beforeOsgi() throws Exception {
         beforeOsgiBase();
-        publish = startPublishInstance(8182,  SUB1_AGENT, false,  SUB2_AGENT);
-        golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
+        publish = startPublishInstance(8182,  SUB1_AGENT, false,  true);
+        golden_publish = startPublishInstance(8183, SUB2_AGENT, true, false);
 
     }
 
@@ -69,8 +69,7 @@ public class StagedDistributionTest extends 
DistributionTestBase {
     @Before
     public void before() {
         createPath(TEST_PATH);
-
-        waitSubQueues(SUB1_AGENT, SUB2_AGENT);
+        Client.waitNumQueues(2);
     }
 
     @Test
@@ -78,7 +77,7 @@ public class StagedDistributionTest extends 
DistributionTestBase {
 
         distribute(TEST_PATH);
 
-        waitEmptySubQueues();
+        Client.waitSumQueueSizes(0);
 
         waitPath(8182, TEST_PATH);
         waitPath(8183, TEST_PATH);
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
index 5f21dd4..57f012c 100644
--- a/src/test/resources/logback.xml
+++ b/src/test/resources/logback.xml
@@ -28,7 +28,9 @@
     <appender-ref ref="console"/>
   </root>
 
+  
   <logger name="kafka" level="WARN"/>
+  <logger 
name="org.apache.sling.distribution.journal.kafka.KafkaJsonMessageSender" 
level="WARN"/>
   <logger name="org.apache.zookeeper" level="WARN"/>
   <logger name="org.apache.kafka.clients" level="WARN"/>
   <logger name="org.apache.sling.jcr.repoinit.impl" level="WARN"/>

Reply via email to