This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git


The following commit(s) were added to refs/heads/master by this push:
     new d04b40d  Introduced build for PlcSubscriptionRequest and 
PlcUnsubscriptionRequest + generified items for subscription + reordered 
SubscriptionRequestCyclicItem so that consumer is the last parameter. + 
adjusted manual test
d04b40d is described below

commit d04b40dd50d401f06ccc7284b1b388044bfa7214
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Thu Aug 16 10:35:17 2018 +0200

    Introduced build for PlcSubscriptionRequest and PlcUnsubscriptionRequest
    + generified items for subscription
    + reordered SubscriptionRequestCyclicItem so that consumer is the last 
parameter.
    + adjusted manual test
---
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java |  9 +++-
 .../java/api/messages/PlcSubscriptionRequest.java  | 51 ++++++++++++++++++-
 .../api/messages/PlcUnsubscriptionRequest.java     | 57 ++++++++++++++++++++++
 .../SubscriptionRequestChangeOfStateItem.java      |  4 +-
 .../items/SubscriptionRequestCyclicItem.java       |  6 +--
 .../items/SubscriptionRequestEventItem.java        |  6 +--
 .../apache/plc4x/java/ads/ManualPlc4XAdsTest.java  | 36 ++++++++------
 7 files changed, 143 insertions(+), 26 deletions(-)

diff --git 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 16fc626..e04e016 100644
--- 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -29,7 +29,10 @@ import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
-import org.apache.plc4x.java.api.messages.items.*;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestCyclicItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
+import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
 import org.apache.plc4x.java.api.model.Address;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,7 +86,9 @@ public class Plc4XConsumer extends ServiceSupport implements 
Consumer, java.util
     @Override
     protected void doStart() throws InterruptedException, ExecutionException, 
TimeoutException {
         PlcSubscriptionRequest request = new PlcSubscriptionRequest();
-        request.addItem(new SubscriptionRequestCyclicItem(dataType, address, 
this, TimeUnit.SECONDS, 3));
+        @SuppressWarnings("unchecked")
+        SubscriptionRequestCyclicItem subscriptionRequestCyclicItem = new 
SubscriptionRequestCyclicItem(dataType, address, TimeUnit.SECONDS, 3, this);
+        request.addItem(subscriptionRequestCyclicItem);
         CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = 
getSubscriber().subscribe(request);
         subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
     }
diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
index 5fa8ac4..67c4894 100644
--- 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
@@ -18,10 +18,59 @@ specific language governing permissions and limitations
 under the License.
 */
 
-import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.*;
+import org.apache.plc4x.java.api.model.Address;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 public class PlcSubscriptionRequest extends 
PlcRequest<SubscriptionRequestItem<?>> {
 
+    public static PlcSubscriptionRequest.Builder builder() {
+        return new PlcSubscriptionRequest.Builder();
+    }
+
+    public static class Builder extends 
PlcRequest.Builder<SubscriptionRequestItem> {
+
+        public final <T> Builder addChangeOfStateItem(Class<T> dataType, 
Address address, Consumer<SubscriptionEventItem<T>> consumer) {
+            // As we don't get a list as response rather we have individual 
consumers we don't need type checking here.
+            //checkType(dataType);
+            requests.add(new SubscriptionRequestChangeOfStateItem<>(dataType, 
address, consumer));
+            return this;
+        }
+
+        public final <T> Builder addCyclicItem(Class<T> dataType, Address 
address, Consumer<SubscriptionEventItem<T>> consumer, TimeUnit timeUnit, int 
period) {
+            // As we don't get a list as response rather we have individual 
consumers we don't need type checking here.
+            //checkType(dataType);
+            requests.add(new SubscriptionRequestCyclicItem<>(dataType, 
address, timeUnit, period, consumer));
+            return this;
+        }
+
+        public final <T> Builder addEventItem(Class<T> dataType, Address 
address, Consumer<SubscriptionEventItem<T>> consumer) {
+            // As we don't get a list as response rather we have individual 
consumers we don't need type checking here.
+            //checkType(dataType);
+            requests.add(new SubscriptionRequestEventItem<>(dataType, address, 
consumer));
+            return this;
+        }
+
+        public final Builder addItem(SubscriptionRequestItem 
subscriptionRequestItem) {
+            requests.add(subscriptionRequestItem);
+            return this;
+        }
+
+        public final PlcSubscriptionRequest build() {
+            if (requests.isEmpty()) {
+                throw new IllegalStateException("No requests added");
+            }
+            PlcSubscriptionRequest plcSubscriptionRequest = new 
PlcSubscriptionRequest();
+            for (SubscriptionRequestItem request : requests) {
+                plcSubscriptionRequest.addItem(request);
+            }
+            return plcSubscriptionRequest;
+        }
+
+    }
+
     @Override
     public String toString() {
         return "PlcSubscriptionRequest{} " + super.toString();
diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
index ed12177..bc46db7 100644
--- 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
@@ -18,12 +18,15 @@ under the License.
 */
 package org.apache.plc4x.java.api.messages;
 
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
 import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
 import org.apache.plc4x.java.api.model.SubscriptionHandle;
 
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class PlcUnsubscriptionRequest implements PlcMessage {
 
@@ -51,6 +54,60 @@ public class PlcUnsubscriptionRequest implements PlcMessage {
         return getRequestItems().size();
     }
 
+    public static PlcUnsubscriptionRequest.Builder builder() {
+        return new PlcUnsubscriptionRequest.Builder();
+    }
+
+    public static class Builder extends 
PlcRequest.Builder<UnsubscriptionRequestItem> {
+
+        public final Builder addHandle(SubscriptionHandle subscriptionHandle) {
+            requests.add(new UnsubscriptionRequestItem(subscriptionHandle));
+            return this;
+        }
+
+        public final Builder addHandle(SubscriptionHandle... 
subscriptionHandles) {
+            
requests.addAll(Arrays.stream(subscriptionHandles).map(UnsubscriptionRequestItem::new).collect(Collectors.toList()));
+            return this;
+        }
+
+        public final Builder addHandle(List<SubscriptionHandle> 
subscriptionHandles) {
+            
requests.addAll(subscriptionHandles.stream().map(UnsubscriptionRequestItem::new).collect(Collectors.toList()));
+            return this;
+        }
+
+        public final Builder addHandle(SubscriptionResponseItem 
subscriptionResponseItem) {
+            requests.add(new 
UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle()));
+            return this;
+        }
+
+        public final Builder addItem(UnsubscriptionRequestItem 
unsubscriptionRequestItem) {
+            requests.add(unsubscriptionRequestItem);
+            return this;
+        }
+
+        public final Builder addItem(UnsubscriptionRequestItem... 
unsubscriptionRequestItems) {
+            requests.addAll(Arrays.asList(unsubscriptionRequestItems));
+            return this;
+        }
+
+        public final Builder addItem(List<UnsubscriptionRequestItem> 
unsubscriptionRequestItems) {
+            requests.addAll(unsubscriptionRequestItems);
+            return this;
+        }
+
+        public final PlcUnsubscriptionRequest build() {
+            if (requests.isEmpty()) {
+                throw new IllegalStateException("No requests added");
+            }
+            PlcUnsubscriptionRequest plcUnsubscriptionRequest = new 
PlcUnsubscriptionRequest();
+            for (UnsubscriptionRequestItem request : requests) {
+                plcUnsubscriptionRequest.addItem(request);
+            }
+            return plcUnsubscriptionRequest;
+        }
+
+    }
+
     @Override
     public String toString() {
         return "PlcUnsubscriptionRequest{" +
diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
index 8e4b0eb..aa46b3a 100644
--- 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
@@ -23,9 +23,9 @@ import org.apache.plc4x.java.api.model.SubscriptionType;
 
 import java.util.function.Consumer;
 
-public class SubscriptionRequestChangeOfStateItem extends 
SubscriptionRequestItem {
+public class SubscriptionRequestChangeOfStateItem<T> extends 
SubscriptionRequestItem<T> {
 
-    public SubscriptionRequestChangeOfStateItem(Class datatype, Address 
address, Consumer consumer) {
+    public SubscriptionRequestChangeOfStateItem(Class<T> datatype, Address 
address, Consumer<SubscriptionEventItem<T>> consumer) {
         super(datatype, address, SubscriptionType.CHANGE_OF_STATE, consumer);
     }
 
diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
index 22793b2..06f3a77 100644
--- 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
@@ -25,13 +25,13 @@ import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
-public class SubscriptionRequestCyclicItem extends SubscriptionRequestItem {
+public class SubscriptionRequestCyclicItem<T> extends 
SubscriptionRequestItem<T> {
 
     private TimeUnit timeUnit;
     private int period;
 
-    public SubscriptionRequestCyclicItem(Class datatype, Address address, 
Consumer consumer, TimeUnit timeUnit, int period) {
-        super(datatype, address, SubscriptionType.CYCLIC, consumer);
+    public SubscriptionRequestCyclicItem(Class<T> dataType, Address address, 
TimeUnit timeUnit, int period, Consumer<SubscriptionEventItem<T>> consumer) {
+        super(dataType, address, SubscriptionType.CYCLIC, consumer);
         this.timeUnit = timeUnit;
         this.period = period;
     }
diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
index e842d88..1abd1b4 100644
--- 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
@@ -23,10 +23,10 @@ import org.apache.plc4x.java.api.model.SubscriptionType;
 
 import java.util.function.Consumer;
 
-public class SubscriptionRequestEventItem extends SubscriptionRequestItem {
+public class SubscriptionRequestEventItem<T> extends 
SubscriptionRequestItem<T> {
 
-    public SubscriptionRequestEventItem(Class datatype, Address address, 
Consumer consumer) {
-        super(datatype, address, SubscriptionType.EVENT, consumer);
+    public SubscriptionRequestEventItem(Class<T> dataType, Address address, 
Consumer<SubscriptionEventItem<T>> consumer) {
+        super(dataType, address, SubscriptionType.EVENT, consumer);
     }
 
     @Override
diff --git 
a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
 
b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index 3511e32..a0e28c8 100644
--- 
a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ 
b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -22,15 +22,17 @@ import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.messages.*;
-import org.apache.plc4x.java.api.messages.items.*;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
 import org.apache.plc4x.java.api.model.Address;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 public class ManualPlc4XAdsTest {
 
@@ -57,20 +59,24 @@ public class ManualPlc4XAdsTest {
             System.out.println("ResponseItem " + responseItem);
             responseItem.getValues().stream().map(integer -> "Value: " + 
integer).forEach(System.out::println);
 
-            Consumer<SubscriptionEventItem<Integer>> notificationConsumer = 
plcNotification -> System.out.println("Received notification " + 
plcNotification);
             PlcSubscriber plcSubscriber = 
plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe 
not available"));
-            PlcSubscriptionRequest subscriptionRequest = new 
PlcSubscriptionRequest();
-            subscriptionRequest.addItem(new 
SubscriptionRequestChangeOfStateItem(Integer.class, address, 
notificationConsumer));
-            CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = 
plcSubscriber.subscribe(subscriptionRequest);
-            PlcSubscriptionResponse subscriptionResponse = 
subscriptionFuture.get(5, TimeUnit.SECONDS);
-            SubscriptionResponseItem subscriptionResponseItem = 
subscriptionResponse.getResponseItem().get();
 
-            PlcUnsubscriptionRequest unsubscriptionRequest = new 
PlcUnsubscriptionRequest();
-            unsubscriptionRequest.addItem(
-                new 
UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle()));
-            CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture =
-                plcSubscriber.unsubscribe(unsubscriptionRequest);
-            PlcUnsubscriptionResponse unsubscriptionResponse = 
unsubscriptionFuture.get(5, TimeUnit.SECONDS);
+            PlcSubscriptionRequest subscriptionRequest = 
PlcSubscriptionRequest.builder()
+                .addChangeOfStateItem(Integer.class, address, plcNotification 
-> System.out.println("Received notification " + plcNotification))
+                .build();
+
+            SubscriptionResponseItem subscriptionResponseItem = 
plcSubscriber.subscribe(subscriptionRequest)
+                .get(5, TimeUnit.SECONDS)
+                .getResponseItem().orElseThrow(() -> new 
RuntimeException("response not available"));
+
+            TimeUnit.SECONDS.sleep(5);
+
+            PlcUnsubscriptionRequest unsubscriptionRequest = 
PlcUnsubscriptionRequest.builder()
+                .addHandle(subscriptionResponseItem)
+                .build();
+
+            PlcUnsubscriptionResponse unsubscriptionResponse = 
plcSubscriber.unsubscribe(unsubscriptionRequest)
+                .get(5, TimeUnit.SECONDS);
             System.out.println(unsubscriptionResponse);
         }
         System.exit(0);

Reply via email to