This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 33ab0ab10e655c26bb2e38c71cb941b3db4fe5e1
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Thu Jul 5 13:20:43 2018 +0200

    Introduced protocol layer to split requestItems to several requests.
---
 .../base/messages/item/CorrelatedRequestItem.java  |  81 +++++++++
 .../base/messages/item/CorrelatedResponseItem.java |  70 ++++++++
 .../SingleItemToSingleRequestProtocol.java         | 193 +++++++++++++++++++++
 3 files changed, 344 insertions(+)

diff --git 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
new file mode 100644
index 0000000..5212618
--- /dev/null
+++ 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
@@ -0,0 +1,81 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.messages.item;
+
+import org.apache.plc4x.java.api.messages.PlcRequestContainer;
+import org.apache.plc4x.java.api.messages.PlcResponse;
+import org.apache.plc4x.java.api.messages.items.RequestItem;
+
+import java.util.Objects;
+
+public class CorrelatedRequestItem<REQUEST_ITEM extends RequestItem<?>> {
+
+    private final int correlationId;
+
+    private final REQUEST_ITEM requestItem;
+
+    private final PlcRequestContainer<?, PlcResponse<?, ?, ?>> 
plcRequestContainer;
+
+    public CorrelatedRequestItem(int correlationId, REQUEST_ITEM requestItem, 
PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer) {
+        this.correlationId = correlationId;
+        this.requestItem = requestItem;
+        this.plcRequestContainer = plcRequestContainer;
+    }
+
+    public int getCorrelationId() {
+        return correlationId;
+    }
+
+    public REQUEST_ITEM getRequestItem() {
+        return requestItem;
+    }
+
+    public PlcRequestContainer<?, PlcResponse<?, ?, ?>> 
getPlcRequestContainer() {
+        return plcRequestContainer;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof CorrelatedRequestItem)) {
+            return false;
+        }
+        CorrelatedRequestItem<?> that = (CorrelatedRequestItem<?>) o;
+        return correlationId == that.correlationId &&
+            Objects.equals(requestItem, that.requestItem) &&
+            Objects.equals(plcRequestContainer, that.plcRequestContainer);
+    }
+
+    @Override
+    public int hashCode() {
+
+        return Objects.hash(correlationId, requestItem, plcRequestContainer);
+    }
+
+    @Override
+    public String toString() {
+        return "CorrelatedRequestItem{" +
+            "correlationId=" + correlationId +
+            ", requestItem=" + requestItem +
+            ", plcRequestContainer=" + plcRequestContainer +
+            '}';
+    }
+}
diff --git 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
new file mode 100644
index 0000000..38a9032
--- /dev/null
+++ 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
@@ -0,0 +1,70 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+ 
+   http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.messages.item;
+
+import org.apache.plc4x.java.api.messages.items.ResponseItem;
+
+import java.util.Objects;
+
+public class CorrelatedResponseItem<RESPONSE_ITEM extends ResponseItem<?>> {
+
+    private final int correlationId;
+
+    private final RESPONSE_ITEM responseItem;
+
+    public CorrelatedResponseItem(int correlationId, RESPONSE_ITEM 
responseItem) {
+        this.correlationId = correlationId;
+        this.responseItem = responseItem;
+    }
+
+    public int getCorrelationId() {
+        return correlationId;
+    }
+
+    public RESPONSE_ITEM getResponseItem() {
+        return responseItem;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof CorrelatedResponseItem)) {
+            return false;
+        }
+        CorrelatedResponseItem<?> that = (CorrelatedResponseItem<?>) o;
+        return correlationId == that.correlationId &&
+            Objects.equals(responseItem, that.responseItem);
+    }
+
+    @Override
+    public int hashCode() {
+
+        return Objects.hash(correlationId, responseItem);
+    }
+
+    @Override
+    public String toString() {
+        return "CorrelatedResponseItem{" +
+            "correlationId=" + correlationId +
+            ", responseItem=" + responseItem +
+            '}';
+    }
+}
diff --git 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
new file mode 100644
index 0000000..8c0e272
--- /dev/null
+++ 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -0,0 +1,193 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.protocol;
+
+import io.netty.channel.*;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.PromiseCombiner;
+import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.RequestItem;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
+import org.apache.plc4x.java.base.messages.item.CorrelatedRequestItem;
+import org.apache.plc4x.java.base.messages.item.CorrelatedResponseItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
+
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
+
+    private PendingWriteQueue queue;
+
+    private ConcurrentMap<Integer, CorrelatedRequestItem<?>> 
sentButUnacknowledgedRequestItems;
+
+    private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> 
containerCorrelationIdMap;
+
+    private ConcurrentMap<PlcRequestContainer<?, ?>, 
List<CorrelatedResponseItem<?>>> responsesToBeDevliered;
+
+    private AtomicInteger correlationId;
+
+    private final MessageToMessageDecoder<CorrelatedResponseItem> decoder = 
new MessageToMessageDecoder<CorrelatedResponseItem>() {
+
+        @Override
+        protected void decode(ChannelHandlerContext ctx, 
CorrelatedResponseItem msg, List<Object> out) throws Exception {
+            SingleItemToSingleRequestProtocol.this.decode(ctx, msg, out);
+        }
+    };
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        this.queue = new PendingWriteQueue(ctx);
+        this.sentButUnacknowledgedRequestItems = new ConcurrentHashMap<>();
+        this.containerCorrelationIdMap = new ConcurrentHashMap<>();
+        this.correlationId = new AtomicInteger();
+        super.channelRegistered(ctx);
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
+        this.queue.removeAndWriteAll();
+        super.channelUnregistered(ctx);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        // Send everything so we get a proper failure for those pending writes
+        this.queue.removeAndWriteAll();
+        super.channelInactive(ctx);
+    }
+
+    
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Decoding
+    
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        decoder.channelRead(ctx, msg);
+        super.read(ctx);
+    }
+
+    private void decode(ChannelHandlerContext ctx, CorrelatedResponseItem<?> 
msg, List<Object> out) throws PlcProtocolException {
+        int correlationId = msg.getCorrelationId();
+        CorrelatedRequestItem<?> correlatedRequestItem = 
sentButUnacknowledgedRequestItems.remove(correlationId);
+        if (correlatedRequestItem == null) {
+            throw new PlcProtocolException("Unrelated package received " + 
msg);
+        }
+        PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer = 
correlatedRequestItem.getPlcRequestContainer();
+        List<CorrelatedResponseItem<?>> correlatedResponseItems = 
responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new 
LinkedList<>());
+        correlatedResponseItems.add(msg);
+        Set<Integer> integers = 
containerCorrelationIdMap.get(plcRequestContainer);
+        integers.remove(correlationId);
+        if (integers.isEmpty()) {
+            PlcResponse<?, ?, ?> plcResponse;
+            if (plcRequestContainer.getRequest() instanceof 
TypeSafePlcReadRequest) {
+                TypeSafePlcReadRequest typeSafePlcReadRequest = 
(TypeSafePlcReadRequest) plcRequestContainer.getRequest();
+                plcResponse = new 
TypeSafePlcReadResponse((TypeSafePlcReadRequest<?>) 
plcRequestContainer.getRequest(), 
correlatedResponseItems.stream().map(correlatedResponseItem -> 
correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else if (plcRequestContainer.getRequest() instanceof 
TypeSafePlcWriteRequest) {
+                plcResponse = new 
TypeSafePlcWriteResponse((TypeSafePlcWriteRequest<?>) 
plcRequestContainer.getRequest(), 
correlatedResponseItems.stream().map(correlatedResponseItem -> 
correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else if (plcRequestContainer.getRequest() instanceof 
PlcReadRequest) {
+                plcResponse = new PlcReadResponse((PlcReadRequest) 
plcRequestContainer.getRequest(), (List) 
correlatedResponseItems.stream().map(correlatedResponseItem -> 
correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else if (plcRequestContainer.getRequest() instanceof 
PlcWriteRequest) {
+                plcResponse = new PlcWriteResponse((PlcWriteRequest) 
plcRequestContainer.getRequest(), (List) 
correlatedResponseItems.stream().map(correlatedResponseItem -> 
correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else {
+                throw new PlcProtocolException("Unknown type detected " + 
plcRequestContainer.getRequest());
+            }
+            plcRequestContainer.getResponseFuture().complete(plcResponse);
+            responsesToBeDevliered.remove(plcRequestContainer);
+        }
+    }
+
+    
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Encoding
+    
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
+        if (msg instanceof PlcRequestContainer) {
+            PlcRequestContainer<?, PlcResponse<?, ?, ?>> in = 
(PlcRequestContainer<?, PlcResponse<?, ?, ?>>) msg;
+            Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, 
plcRequestContainer -> new HashSet<>());
+
+            // Create a promise that has to be called multiple times.
+            PromiseCombiner promiseCombiner = new PromiseCombiner();
+            PlcRequest<?> request = in.getRequest();
+            for (RequestItem<?> item : request.getRequestItems()) {
+                ChannelPromise subPromise = new 
DefaultChannelPromise(promise.channel());
+
+                int tdpu = correlationId.getAndIncrement();
+                queue.add(new CorrelatedRequestItem<>(tdpu, item, in), 
subPromise);
+                if (!tdpus.add(tdpu)) {
+                    throw new IllegalStateException("AtomicInteger should not 
create duplicated ids: " + tdpu);
+                }
+                promiseCombiner.add((Future) subPromise);
+            }
+            promiseCombiner.finish(promise);
+
+            // Start sending the queue content.
+            trySendingMessages(ctx);
+        } else {
+            super.write(ctx, msg, promise);
+        }
+    }
+
+    
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Helpers
+    
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    protected synchronized void trySendingMessages(ChannelHandlerContext ctx) {
+        while (queue.size() > 0) {
+            // Get the RequestItem that is up next in the queue.
+            CorrelatedRequestItem<?> currentItem = (CorrelatedRequestItem) 
queue.current();
+
+            if (currentItem == null) {
+                break;
+            }
+            // Send the TPDU.
+            try {
+                ChannelFuture channelFuture = queue.removeAndWrite();
+                ctx.flush();
+                if (channelFuture == null) {
+                    break;
+                }
+            } catch (Exception e) {
+                LOGGER.error("Error sending more queues messages", e);
+                ctx.fireExceptionCaught(e);
+            }
+
+            // Add it to the list of sentButUnacknowledgedRequestItems.
+            
sentButUnacknowledgedRequestItems.put(currentItem.getCorrelationId(), 
currentItem);
+
+            LOGGER.debug("Item Message with id {} sent", 
currentItem.getCorrelationId());
+        }
+        ctx.flush();
+    }
+}

Reply via email to