[GitHub] [nifi] heritamas commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-09 Thread GitBox


heritamas commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r553931809



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSourceTask extends SourceTask {
+private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
+
+private StatelessDataflow dataflow;
+private String outputPortName;
+private String topicName;
+private String topicNameAttribute;
+private TriggerResult triggerResult;
+private String keyAttributeName;
+private Pattern headerAttributeNamePattern;
+private long timeoutMillis;
+private String dataflowName;
+
+private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
+
+@Override
+public String version() {
+return StatelessKafkaConnectorUtil.getVersion();
+}
+
+@Override
+public void start(final Map properties) {
+logger.info("Starting Source Task with properties {}", properties);
+
+final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME);
+topicNameAttribute = 
properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE);
+keyAttributeName = 
properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE);
+
+if (topicName == null && topicNameAttribute == null) {
+throw new ConfigException("Either the topic.name or 
topic.name.attribute configuration must be specified");
+}
+
+final String headerRegex = 
properties.get(StatelessNiFiSourceConnector.HEADER_REGEX);
+headerAttributeNamePattern = headerRegex == null ? null : 
Pattern.compile(headerRegex);
+
+dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+
+// Determine the name of the Output Port to retrieve data from
+dataflowName = properties.get("name");
+outputPortName = 
properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
+if (outputPortName == null) {
+final Set outputPorts = dataflow.getOutputPortNames();
+if (outputPorts.isEmpty()) {
+throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> does not have an Output Port at the root level. Dataflows 
used for a Kafka Connect Source Task "
++ "must have at least one Output Port at the root level.");
+}
+
+if (outputPorts.size() > 1) {
+throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> has multiple Output Ports at the root level (" + 
outputPorts.toString()
++ "). The " + 
StatelessNiFiSourceConnector.OUTPUT_PORT_NAME + " property must be set to 
indicate which of these Ports Kafka records should 

[GitHub] [nifi] heritamas commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…

2021-01-08 Thread GitBox


heritamas commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r553931809



##
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSourceTask extends SourceTask {
+private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
+
+private StatelessDataflow dataflow;
+private String outputPortName;
+private String topicName;
+private String topicNameAttribute;
+private TriggerResult triggerResult;
+private String keyAttributeName;
+private Pattern headerAttributeNamePattern;
+private long timeoutMillis;
+private String dataflowName;
+
+private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
+
+@Override
+public String version() {
+return StatelessKafkaConnectorUtil.getVersion();
+}
+
+@Override
+public void start(final Map properties) {
+logger.info("Starting Source Task with properties {}", properties);
+
+final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME);
+topicNameAttribute = 
properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE);
+keyAttributeName = 
properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE);
+
+if (topicName == null && topicNameAttribute == null) {
+throw new ConfigException("Either the topic.name or 
topic.name.attribute configuration must be specified");
+}
+
+final String headerRegex = 
properties.get(StatelessNiFiSourceConnector.HEADER_REGEX);
+headerAttributeNamePattern = headerRegex == null ? null : 
Pattern.compile(headerRegex);
+
+dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+
+// Determine the name of the Output Port to retrieve data from
+dataflowName = properties.get("name");
+outputPortName = 
properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
+if (outputPortName == null) {
+final Set outputPorts = dataflow.getOutputPortNames();
+if (outputPorts.isEmpty()) {
+throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> does not have an Output Port at the root level. Dataflows 
used for a Kafka Connect Source Task "
++ "must have at least one Output Port at the root level.");
+}
+
+if (outputPorts.size() > 1) {
+throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> has multiple Output Ports at the root level (" + 
outputPorts.toString()
++ "). The " + 
StatelessNiFiSourceConnector.OUTPUT_PORT_NAME + " property must be set to 
indicate which of these Ports Kafka records should