[GitHub] [nifi] heritamas commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…
heritamas commented on a change in pull request #4730: URL: https://github.com/apache/nifi/pull/4730#discussion_r553931809 ## File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java ## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.connect; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +public class StatelessNiFiSourceTask extends SourceTask { +private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSourceTask.class); + +private StatelessDataflow dataflow; +private String outputPortName; +private String topicName; +private String topicNameAttribute; +private TriggerResult triggerResult; +private String keyAttributeName; +private Pattern headerAttributeNamePattern; +private long timeoutMillis; +private String dataflowName; + +private final AtomicLong unacknowledgedRecords = new AtomicLong(0L); + +@Override +public String version() { +return StatelessKafkaConnectorUtil.getVersion(); +} + +@Override +public void start(final Map properties) { +logger.info("Starting Source Task with properties {}", properties); + +final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT); +timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS); + +topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME); +topicNameAttribute = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE); +keyAttributeName = properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE); + +if (topicName == null && topicNameAttribute == null) { +throw new ConfigException("Either the topic.name or topic.name.attribute configuration must be specified"); +} + +final String headerRegex = properties.get(StatelessNiFiSourceConnector.HEADER_REGEX); +headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex); + +dataflow = StatelessKafkaConnectorUtil.createDataflow(properties); + +// Determine the name of the Output Port to retrieve data from +dataflowName = properties.get("name"); +outputPortName = properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME); +if (outputPortName == null) { +final Set outputPorts = dataflow.getOutputPortNames(); +if (outputPorts.isEmpty()) { +throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have an Output Port at the root level. Dataflows used for a Kafka Connect Source Task " ++ "must have at least one Output Port at the root level."); +} + +if (outputPorts.size() > 1) { +throw new ConfigException("The dataflow specified for <" + dataflowName + "> has multiple Output Ports at the root level (" + outputPorts.toString() ++ "). The " + StatelessNiFiSourceConnector.OUTPUT_PORT_NAME + " property must be set to indicate which of these Ports Kafka records should
[GitHub] [nifi] heritamas commented on a change in pull request #4730: NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.…
heritamas commented on a change in pull request #4730: URL: https://github.com/apache/nifi/pull/4730#discussion_r553931809 ## File path: nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java ## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.connect; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +public class StatelessNiFiSourceTask extends SourceTask { +private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSourceTask.class); + +private StatelessDataflow dataflow; +private String outputPortName; +private String topicName; +private String topicNameAttribute; +private TriggerResult triggerResult; +private String keyAttributeName; +private Pattern headerAttributeNamePattern; +private long timeoutMillis; +private String dataflowName; + +private final AtomicLong unacknowledgedRecords = new AtomicLong(0L); + +@Override +public String version() { +return StatelessKafkaConnectorUtil.getVersion(); +} + +@Override +public void start(final Map properties) { +logger.info("Starting Source Task with properties {}", properties); + +final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT); +timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS); + +topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME); +topicNameAttribute = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE); +keyAttributeName = properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE); + +if (topicName == null && topicNameAttribute == null) { +throw new ConfigException("Either the topic.name or topic.name.attribute configuration must be specified"); +} + +final String headerRegex = properties.get(StatelessNiFiSourceConnector.HEADER_REGEX); +headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex); + +dataflow = StatelessKafkaConnectorUtil.createDataflow(properties); + +// Determine the name of the Output Port to retrieve data from +dataflowName = properties.get("name"); +outputPortName = properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME); +if (outputPortName == null) { +final Set outputPorts = dataflow.getOutputPortNames(); +if (outputPorts.isEmpty()) { +throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have an Output Port at the root level. Dataflows used for a Kafka Connect Source Task " ++ "must have at least one Output Port at the root level."); +} + +if (outputPorts.size() > 1) { +throw new ConfigException("The dataflow specified for <" + dataflowName + "> has multiple Output Ports at the root level (" + outputPorts.toString() ++ "). The " + StatelessNiFiSourceConnector.OUTPUT_PORT_NAME + " property must be set to indicate which of these Ports Kafka records should