jt2594838 commented on code in PR #16944:
URL: https://github.com/apache/iotdb/pull/16944#discussion_r2647374776
##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java:
##########
@@ -20,68 +20,151 @@
package org.apache.iotdb.pipe.it.single;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT1;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT1.class})
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
@Test
- public void testOPCUASink() throws Exception {
+ public void testOPCUAServerSink() throws Exception {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values
(1, 1)", null);
- final Map<String, String> connectorAttributes = new HashMap<>();
- connectorAttributes.put("sink", "opc-ua-sink");
- connectorAttributes.put("opcua.model", "client-server");
+ final Map<String, String> sinkAttributes = new HashMap<>();
+
+ sinkAttributes.put("sink", "opc-ua-sink");
+ sinkAttributes.put("opcua.model", "client-server");
+ sinkAttributes.put("security-policy", "None");
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
- .setExtractorAttributes(Collections.emptyMap())
+ new TCreatePipeReq("testPipe", sinkAttributes)
+ .setExtractorAttributes(Collections.singletonMap("user",
"root"))
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
+
+ final OpcUaClient opcUaClient =
+ getOpcUaClient("opc.tcp://127.0.0.1:12686/iotdb",
SecurityPolicy.None, "root", "root");
+ DataValue value =
+ opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/d1/s1")).get();
+ Assert.assertEquals(new Variant(1.0), value.getValue());
+ Assert.assertEquals(new DateTime(timestampToUtc(1)),
value.getSourceTime());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .alterPipe(
+ new TAlterPipeReq()
+ .setPipeName("testPipe")
+ .setIsReplaceAllConnectorAttributes(false)
+
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
+ .setProcessorAttributes(Collections.emptyMap())
+ .setExtractorAttributes(Collections.emptyMap()))
+ .getCode());
+
+ TestUtils.executeNonQuery(
+ env,
+ "insert into root.db.opc(time, value, quality, other) values (1, 1,
false, 1)",
+ null);
+ value = opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/opc")).get();
+ Assert.assertEquals(new Variant(1.0), value.getValue());
+ Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+ Assert.assertEquals(new DateTime(timestampToUtc(1)),
value.getSourceTime());
+
+ TestUtils.executeNonQuery(
+ env, "insert into root.db.opc(time, quality) values (2, true)",
null);
+ TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value)
values (2, 2)", null);
+
+ final long startTime = System.currentTimeMillis();
+ while (true) {
+ try {
+ value =
+ opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/opc")).get();
+ Assert.assertEquals(new DateTime(timestampToUtc(2)),
value.getSourceTime());
+ Assert.assertEquals(new Variant(2.0), value.getValue());
+ Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
+ break;
+ } catch (final Throwable t) {
+ if (System.currentTimeMillis() - startTime > 10_000L) {
+ throw t;
+ }
+ }
+ }
+
+ opcUaClient.disconnect().get();
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("testPipe").getCode());
// Test reconstruction
- connectorAttributes.put("password123456", "test");
+ sinkAttributes.put("password", "test");
+ sinkAttributes.put("security-policy", "basic256sha256");
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
+ new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
+ // Banned none, only allows basic256sha256
+ Assert.assertThrows(
+ PipeException.class,
+ () ->
+ getOpcUaClient(
+ "opc.tcp://127.0.0.1:12686/iotdb", SecurityPolicy.None,
"root", "root"));
+
// Test conflict
- connectorAttributes.put("password123456", "conflict");
- Assert.assertEquals(
- TSStatusCode.PIPE_ERROR.getStatusCode(),
- client
- .createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
- .setExtractorAttributes(Collections.emptyMap())
- .setProcessorAttributes(Collections.emptyMap()))
- .getCode());
+ sinkAttributes.put("password", "conflict");
+ try {
+ TestUtils.executeNonQuery(
+ env, "create pipe test1 ('sink'='opc-ua-sink',
'password'='conflict')", null);
+ Assert.fail();
+ } catch (final Exception e) {
+ Assert.assertEquals(
+ "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing
server with tcp port 12686 and https port 8443's password test conflicts to the
new password conflict, reject reusing.",
+ e.getMessage());
Review Comment:
Do not give specific password in any return message or log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]