asfgit closed pull request #17: Add support for multiple queries in kafka 
source connector
URL: https://github.com/apache/incubator-plc4x/pull/17
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 45ae926eb..189920886 100644
--- 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -35,7 +36,7 @@ Licensed to the Apache Software Foundation (ASF) under one
     static final String QUERY_CONFIG = "query";
     private static final String QUERY_DOC = "Field query to be sent to the 
PLC";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
URL_DOC)
         .define(QUERY_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, QUERY_DOC);
 
@@ -59,8 +60,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     @Override
     public void start(Map<String, String> props) {
-        url = props.get(URL_CONFIG);
-        query = props.get(QUERY_CONFIG);
+        AbstractConfig config = new 
AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+        url = config.getString(URL_CONFIG);
+        query = config.getString(QUERY_CONFIG);
     }
 
     @Override
diff --git 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index b29418f18..a54d5b08b 100644
--- 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -33,8 +34,6 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.util.concurrent.ExecutionException;
 
 public class Plc4xSinkTask extends SinkTask {
-    private final static String FIELD_KEY = "key"; // TODO: is this really 
necessary?
-
     private String url;
     private String query;
 
@@ -48,8 +47,9 @@ public String version() {
 
     @Override
     public void start(Map<String, String> props) {
-        url = props.get(Plc4xSinkConnector.URL_CONFIG);
-        query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
+        AbstractConfig config = new 
AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+        url = config.getString(Plc4xSinkConnector.URL_CONFIG);
+        query = config.getString(Plc4xSinkConnector.QUERY_CONFIG);
 
         openConnection();
 
@@ -66,7 +66,7 @@ public void stop() {
     public void put(Collection<SinkRecord> records) {
         for (SinkRecord record: records) {
             String value = record.value().toString(); // TODO: implement other 
data types
-            PlcWriteRequest plcRequest = 
plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
+            PlcWriteRequest plcRequest = 
plcWriter.writeRequestBuilder().addItem(query, query, value).build();
             doWrite(plcRequest);
         }
     }
diff --git 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4d1d9d026..4d014a535 100644
--- 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -18,13 +18,15 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -35,22 +37,22 @@ Licensed to the Apache Software Foundation (ASF) under one
     static final String URL_CONFIG = "url";
     private static final String URL_DOC = "Connection string used by PLC4X to 
connect to the PLC";
 
-    static final String QUERY_CONFIG = "query";
-    private static final String QUERY_DOC = "Field query to be sent to the 
PLC";
+    static final String QUERIES_CONFIG = "queries";
+    private static final String QUERIES_DOC = "Field queries to be sent to the 
PLC";
 
     static final String RATE_CONFIG = "rate";
     private static final Integer RATE_DEFAULT = 1000;
     private static final String RATE_DOC = "Polling rate";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(TOPIC_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, TOPIC_DOC)
         .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
URL_DOC)
-        .define(QUERY_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, QUERY_DOC)
+        .define(QUERIES_CONFIG, ConfigDef.Type.LIST, 
ConfigDef.Importance.HIGH, QUERIES_DOC)
         .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, 
ConfigDef.Importance.MEDIUM, RATE_DOC);
 
     private String topic;
     private String url;
-    private String query;
+    private List<String> queries;
     private Integer rate;
 
     @Override
@@ -60,22 +62,26 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        Map<String, String> taskConfig = new HashMap<>();
-        taskConfig.put(TOPIC_CONFIG, topic);
-        taskConfig.put(URL_CONFIG, url);
-        taskConfig.put(QUERY_CONFIG, query);
-        taskConfig.put(RATE_CONFIG, rate.toString());
-
-        // Only one task will be created; ignoring maxTasks for now
-        return Collections.singletonList(taskConfig);
+        List<Map<String, String>> configs = new LinkedList<>();
+        List<List<String>> queryGroups = 
ConnectorUtils.groupPartitions(queries, maxTasks);
+        for (List<String> queryGroup: queryGroups) {
+            Map<String, String> taskConfig = new HashMap<>();
+            taskConfig.put(TOPIC_CONFIG, topic);
+            taskConfig.put(URL_CONFIG, url);
+            taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup));
+            taskConfig.put(RATE_CONFIG, rate.toString());
+            configs.add(taskConfig);
+        }
+        return configs;
     }
 
     @Override
     public void start(Map<String, String> props) {
-        topic = props.get(TOPIC_CONFIG);
-        url = props.get(URL_CONFIG);
-        query = props.get(QUERY_CONFIG);
-        rate = Integer.valueOf(props.get(RATE_CONFIG));
+        AbstractConfig config = new 
AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+        topic = config.getString(TOPIC_CONFIG);
+        url = config.getString(URL_CONFIG);
+        queries = config.getList(QUERIES_CONFIG);
+        rate = config.getInt(RATE_CONFIG);
     }
 
     @Override
diff --git 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 798ae3113..c354a1ea2 100644
--- 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -32,6 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.kafka.util.VersionUtil;
 
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
@@ -45,11 +47,10 @@ Licensed to the Apache Software Foundation (ASF) under one
 public class Plc4xSourceTask extends SourceTask {
     private final static long WAIT_LIMIT_MILLIS = 100;
     private final static long TIMEOUT_LIMIT_MILLIS = 5000;
-    private final static String FIELD_KEY = "key"; // TODO: is this really 
necessary?
 
     private String topic;
     private String url;
-    private String query;
+    private List<String> queries;
 
     private PlcConnection plcConnection;
     private PlcReader plcReader;
@@ -67,16 +68,22 @@ public String version() {
 
     @Override
     public void start(Map<String, String> props) {
-        topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
-        url = props.get(Plc4xSourceConnector.URL_CONFIG);
-        query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
+        AbstractConfig config = new 
AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+        topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG);
+        url = config.getString(Plc4xSourceConnector.URL_CONFIG);
+        queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG);
 
         openConnection();
 
         plcReader = plcConnection.getReader()
             .orElseThrow(() -> new ConnectException("PlcReader not available 
for this type of connection"));
 
-        plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, 
query).build();
+
+        PlcReadRequest.Builder builder = plcReader.readRequestBuilder();
+        for (String query : queries) {
+            builder.addItem(query, query);
+        }
+        plcRequest = builder.build();
 
         int rate = 
Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
         scheduler = Executors.newScheduledThreadPool(1);
@@ -152,30 +159,35 @@ private synchronized boolean awaitFetch(long 
milliseconds) throws InterruptedExc
     }
 
     private List<SourceRecord> extractValues(PlcReadResponse<?> response) {
-        final PlcResponseCode rc = response.getResponseCode(FIELD_KEY);
-
-        if (!rc.equals(PlcResponseCode.OK))
-            return null; // TODO: should we really ignore this?
-
-        Object rawValue = response.getObject(FIELD_KEY);
-        Schema valueSchema = getSchema(rawValue.getClass());
-        Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? 
rawValue.toString() : rawValue;
-        Long timestamp = System.currentTimeMillis();
-        Map<String, String> sourcePartition = Collections.singletonMap("url", 
url);
-        Map<String, Long> sourceOffset = Collections.singletonMap("offset", 
timestamp);
-
-        SourceRecord record =
-            new SourceRecord(
-                sourcePartition,
-                sourceOffset,
-                topic,
-                Schema.STRING_SCHEMA,
-                query,
-                valueSchema,
-                value
-            );
-
-        return Collections.singletonList(record); // TODO: what if there are 
multiple values?
+        final List<SourceRecord> result = new LinkedList<>();
+        for (String query : queries) {
+            final PlcResponseCode rc = response.getResponseCode(query);
+            if (!rc.equals(PlcResponseCode.OK))  {
+                continue;
+            }
+
+            Object rawValue = response.getObject(query);
+            Schema valueSchema = getSchema(rawValue.getClass());
+            Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? 
rawValue.toString() : rawValue;
+            Long timestamp = System.currentTimeMillis();
+            Map<String, String> sourcePartition = 
Collections.singletonMap("url", url);
+            Map<String, Long> sourceOffset = 
Collections.singletonMap("offset", timestamp);
+
+            SourceRecord record =
+                new SourceRecord(
+                    sourcePartition,
+                    sourceOffset,
+                    topic,
+                    Schema.STRING_SCHEMA,
+                    query,
+                    valueSchema,
+                    value
+                );
+
+            result.add(record);
+        }
+
+        return result;
     }
 
     private Schema getSchema(Class<?> type) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to