This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch feature/TopLevelItemSpliting in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit a53cbbf8e022409ef4c3aaa79c93b029e39375bf Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Wed Sep 26 18:01:46 2018 +0200 [General] some progress on the SingleItemToSingleRequestProtocol --- .../base/messages/item/CorrelatedRequestItem.java | 81 -------- .../base/messages/item/CorrelatedResponseItem.java | 70 ------- .../SingleItemToSingleRequestProtocol.java | 217 +++++++++++++++------ 3 files changed, 153 insertions(+), 215 deletions(-) diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java deleted file mode 100644 index 5212618..0000000 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - */ -package org.apache.plc4x.java.base.messages.item; - -import org.apache.plc4x.java.api.messages.PlcRequestContainer; -import org.apache.plc4x.java.api.messages.PlcResponse; -import org.apache.plc4x.java.api.messages.items.RequestItem; - -import java.util.Objects; - -public class CorrelatedRequestItem<REQUEST_ITEM extends RequestItem<?>> { - - private final int correlationId; - - private final REQUEST_ITEM requestItem; - - private final PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer; - - public CorrelatedRequestItem(int correlationId, REQUEST_ITEM requestItem, PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer) { - this.correlationId = correlationId; - this.requestItem = requestItem; - this.plcRequestContainer = plcRequestContainer; - } - - public int getCorrelationId() { - return correlationId; - } - - public REQUEST_ITEM getRequestItem() { - return requestItem; - } - - public PlcRequestContainer<?, PlcResponse<?, ?, ?>> getPlcRequestContainer() { - return plcRequestContainer; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof CorrelatedRequestItem)) { - return false; - } - CorrelatedRequestItem<?> that = (CorrelatedRequestItem<?>) o; - return correlationId == that.correlationId && - Objects.equals(requestItem, that.requestItem) && - Objects.equals(plcRequestContainer, that.plcRequestContainer); - } - - @Override - public int hashCode() { - - return Objects.hash(correlationId, requestItem, plcRequestContainer); - } - - @Override - public String toString() { - return "CorrelatedRequestItem{" + - "correlationId=" + correlationId + - ", requestItem=" + requestItem + - ", plcRequestContainer=" + plcRequestContainer + - '}'; - } -} diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java deleted file mode 100644 index 38a9032..0000000 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - */ -package org.apache.plc4x.java.base.messages.item; - -import org.apache.plc4x.java.api.messages.items.ResponseItem; - -import java.util.Objects; - -public class CorrelatedResponseItem<RESPONSE_ITEM extends ResponseItem<?>> { - - private final int correlationId; - - private final RESPONSE_ITEM responseItem; - - public CorrelatedResponseItem(int correlationId, RESPONSE_ITEM responseItem) { - this.correlationId = correlationId; - this.responseItem = responseItem; - } - - public int getCorrelationId() { - return correlationId; - } - - public RESPONSE_ITEM getResponseItem() { - return responseItem; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof CorrelatedResponseItem)) { - return false; - } - CorrelatedResponseItem<?> that = (CorrelatedResponseItem<?>) o; - return correlationId == that.correlationId && - Objects.equals(responseItem, that.responseItem); - } - - @Override - public int hashCode() { - - return Objects.hash(correlationId, responseItem); - } - - @Override - public String toString() { - return "CorrelatedResponseItem{" + - "correlationId=" + correlationId + - ", responseItem=" + responseItem + - '}'; - } -} diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java index 8c0e272..267f9a8 100644 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java @@ -19,52 +19,39 @@ package org.apache.plc4x.java.base.protocol; import io.netty.channel.*; -import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.PromiseCombiner; -import org.apache.plc4x.java.api.exceptions.PlcProtocolException; -import org.apache.plc4x.java.api.messages.*; -import org.apache.plc4x.java.api.messages.items.RequestItem; -import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest; -import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse; -import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest; -import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse; -import org.apache.plc4x.java.base.messages.item.CorrelatedRequestItem; -import org.apache.plc4x.java.base.messages.item.CorrelatedResponseItem; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; +import org.apache.plc4x.java.api.model.PlcField; +import org.apache.plc4x.java.api.types.PlcResponseCode; +import org.apache.plc4x.java.base.messages.*; +import org.apache.plc4x.java.base.messages.items.FieldItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; +import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; +// TODO: write test public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class); private PendingWriteQueue queue; - private ConcurrentMap<Integer, CorrelatedRequestItem<?>> sentButUnacknowledgedRequestItems; + private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedRequestItems; private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap; - private ConcurrentMap<PlcRequestContainer<?, ?>, List<CorrelatedResponseItem<?>>> responsesToBeDevliered; + private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDevliered; private AtomicInteger correlationId; - private final MessageToMessageDecoder<CorrelatedResponseItem> decoder = new MessageToMessageDecoder<CorrelatedResponseItem>() { - - @Override - protected void decode(ChannelHandlerContext ctx, CorrelatedResponseItem msg, List<Object> out) throws Exception { - SingleItemToSingleRequestProtocol.this.decode(ctx, msg, out); - } - }; - @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { this.queue = new PendingWriteQueue(ctx); @@ -91,42 +78,49 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { // Decoding //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - decoder.channelRead(ctx, msg); - super.read(ctx); - } - - private void decode(ChannelHandlerContext ctx, CorrelatedResponseItem<?> msg, List<Object> out) throws PlcProtocolException { - int correlationId = msg.getCorrelationId(); - CorrelatedRequestItem<?> correlatedRequestItem = sentButUnacknowledgedRequestItems.remove(correlationId); - if (correlatedRequestItem == null) { - throw new PlcProtocolException("Unrelated package received " + msg); + private void tryFinish(int correlationId, InternalPlcResponse msg) { + PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId); + if (plcRequestContainer == null) { + throw new PlcRuntimeException("Unrelated package received " + msg); } - PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer = correlatedRequestItem.getPlcRequestContainer(); - List<CorrelatedResponseItem<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>()); + List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>()); correlatedResponseItems.add(msg); Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer); integers.remove(correlationId); if (integers.isEmpty()) { - PlcResponse<?, ?, ?> plcResponse; - if (plcRequestContainer.getRequest() instanceof TypeSafePlcReadRequest) { - TypeSafePlcReadRequest typeSafePlcReadRequest = (TypeSafePlcReadRequest) plcRequestContainer.getRequest(); - plcResponse = new TypeSafePlcReadResponse((TypeSafePlcReadRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList())); - } else if (plcRequestContainer.getRequest() instanceof TypeSafePlcWriteRequest) { - plcResponse = new TypeSafePlcWriteResponse((TypeSafePlcWriteRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList())); - } else if (plcRequestContainer.getRequest() instanceof PlcReadRequest) { - plcResponse = new PlcReadResponse((PlcReadRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList())); - } else if (plcRequestContainer.getRequest() instanceof PlcWriteRequest) { - plcResponse = new PlcWriteResponse((PlcWriteRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList())); + InternalPlcResponse<?> plcResponse; + if (plcRequestContainer.getRequest() instanceof InternalPlcReadRequest) { + InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) plcRequestContainer.getRequest(); + HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>(); + + correlatedResponseItems.stream() + .map(InternalPlcReadResponse.class::cast) + .map(InternalPlcReadResponse::getValues) + .forEach(stringPairMap -> stringPairMap.forEach(fields::put)); + + plcResponse = new DefaultPlcReadResponse(internalPlcReadRequest, fields); + } else if (plcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) { + InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) plcRequestContainer.getRequest(); + HashMap<String, PlcResponseCode> values = new HashMap<>(); + + correlatedResponseItems.stream() + .map(InternalPlcWriteResponse.class::cast) + .map(InternalPlcWriteResponse::getValues) + .forEach(stringPairMap -> stringPairMap.forEach(values::put)); + + plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values); } else { - throw new PlcProtocolException("Unknown type detected " + plcRequestContainer.getRequest()); + throw new PlcRuntimeException("Unknown type detected " + plcRequestContainer.getRequest()); } plcRequestContainer.getResponseFuture().complete(plcResponse); responsesToBeDevliered.remove(plcRequestContainer); } } + private void errored(int correlationId, Throwable throwable) { + + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Encoding //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -134,22 +128,68 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof PlcRequestContainer) { - PlcRequestContainer<?, PlcResponse<?, ?, ?>> in = (PlcRequestContainer<?, PlcResponse<?, ?, ?>>) msg; + @SuppressWarnings("unchecked") + PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg; Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>()); // Create a promise that has to be called multiple times. PromiseCombiner promiseCombiner = new PromiseCombiner(); - PlcRequest<?> request = in.getRequest(); - for (RequestItem<?> item : request.getRequestItems()) { - ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); + InternalPlcRequest request = in.getRequest(); + if (request instanceof InternalPlcFieldRequest) { + InternalPlcFieldRequest internalPlcFieldRequest = (InternalPlcFieldRequest) request; + + if (internalPlcFieldRequest instanceof InternalPlcReadRequest) { + InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) internalPlcFieldRequest; + // TODO: repackage + internalPlcReadRequest.getNamedFields().forEach(field -> { + ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); + + int tdpu = correlationId.getAndIncrement(); + CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>() + .thenApply(InternalPlcResponse.class::cast) + .whenComplete((internalPlcResponse, throwable) -> { + if (throwable != null) { + errored(tdpu, throwable); + } else { + tryFinish(tdpu, internalPlcResponse); + } + }); + queue.add(new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture), subPromise); + if (!tdpus.add(tdpu)) { + throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu); + } + promiseCombiner.add((Future) subPromise); + }); + } + if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) { + InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest; + // TODO: repackage + internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> { + ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); - int tdpu = correlationId.getAndIncrement(); - queue.add(new CorrelatedRequestItem<>(tdpu, item, in), subPromise); - if (!tdpus.add(tdpu)) { - throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu); + int tdpu = correlationId.getAndIncrement(); + CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>() + .thenApply(InternalPlcResponse.class::cast) + .whenComplete((internalPlcResponse, throwable) -> { + if (throwable != null) { + errored(tdpu, throwable); + } else { + tryFinish(tdpu, internalPlcResponse); + } + }); + queue.add(new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture), subPromise); + if (!tdpus.add(tdpu)) { + throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu); + } + promiseCombiner.add((Future) subPromise); + }); } + } else { + ChannelPromise subPromise = new DefaultChannelPromise(promise.channel()); + queue.add(msg, subPromise); promiseCombiner.add((Future) subPromise); } + promiseCombiner.finish(promise); // Start sending the queue content. @@ -166,11 +206,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { protected synchronized void trySendingMessages(ChannelHandlerContext ctx) { while (queue.size() > 0) { // Get the RequestItem that is up next in the queue. - CorrelatedRequestItem<?> currentItem = (CorrelatedRequestItem) queue.current(); + PlcRequestContainer currentItem = (PlcRequestContainer) queue.current(); + InternalPlcRequest request = currentItem.getRequest(); - if (currentItem == null) { - break; - } // Send the TPDU. try { ChannelFuture channelFuture = queue.removeAndWrite(); @@ -183,11 +221,62 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler { ctx.fireExceptionCaught(e); } - // Add it to the list of sentButUnacknowledgedRequestItems. - sentButUnacknowledgedRequestItems.put(currentItem.getCorrelationId(), currentItem); + if (request instanceof CorrelatedPlcRequest) { + CorrelatedPlcRequest correlatedPlcRequest = (CorrelatedPlcRequest) request; + + // Add it to the list of sentButUnacknowledgedRequestItems. + sentButUnacknowledgedRequestItems.put(correlatedPlcRequest.getTdpu(), currentItem); - LOGGER.debug("Item Message with id {} sent", currentItem.getCorrelationId()); + LOGGER.debug("Item Message with id {} sent", correlatedPlcRequest.getTdpu()); + } } ctx.flush(); } + + interface CorrelatedPlcRequest extends InternalPlcRequest { + + int getTdpu(); + } + + private static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest { + + private final int tdpu; + + public CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) { + super(fields); + this.tdpu = tdpu; + } + + public static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) { + LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>(); + fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue()); + return new CorrelatedPlcReadRequest(fields, tdpu); + } + + @Override + public int getTdpu() { + return tdpu; + } + } + + private static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest { + + private final int tdpu; + + public CorrelatedPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) { + super(fields); + this.tdpu = tdpu; + } + + public static CorrelatedPlcWriteRequest of(Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) { + LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>(); + fields.put(fieldItemTriple.getLeft(), Pair.of(fieldItemTriple.getMiddle(), fieldItemTriple.getRight())); + return new CorrelatedPlcWriteRequest(fields, tdpu); + } + + @Override + public int getTdpu() { + return tdpu; + } + } }