Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-16 Thread via GitHub


exceptionfactory merged PR #10964:
URL: https://github.com/apache/nifi/pull/10964


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-16 Thread via GitHub


markap14 commented on PR #10964:
URL: https://github.com/apache/nifi/pull/10964#issuecomment-4069723822

   Great catch @exceptionfactory looks like it was coming in through this path:
   ```
   nifi-aws-kinesis-nar
 └─ nifi-aws-kinesis (compile)
  └─ amazon-kinesis-client 3.4.1 (compile)
   └─ kinesis SDK 2.42.8 (compile, version managed from 2.41.21)
└─ apache-client 2.42.8 (runtime)
   ```
   
   Should be good now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-13 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2933387807


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis-nar/pom.xml:
##
@@ -98,7 +98,7 @@
 
 
 software.amazon.awssdk
-apache-client
+apache5-client

Review Comment:
   D'oh! I added it to the api nar but it looks like i didn't include that in 
the commit 🤦 Will have that up shortly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-13 Thread via GitHub


awelless commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2931090837


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis-nar/pom.xml:
##
@@ -98,7 +98,7 @@
 
 
 software.amazon.awssdk
-apache-client
+apache5-client

Review Comment:
   Currently `nifi-aws-service-api-nar` doesn't bring `apache5-client` as a 
dependency. We should either add it to that nar or remove it from this list.
   
   Before adding it into `nifi-aws-service-api` I was getting 
`ClassNotFoundException` for Apache 5 http client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-11 Thread via GitHub


markap14 commented on PR #10964:
URL: https://github.com/apache/nifi/pull/10964#issuecomment-4041599923

   Thanks @exceptionfactory I think all of your feedback makes sense. I pushed 
a new commit that incorporates all of it and switches to Apache HTTP Client 5 
instead of version 4.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-11 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2920461755


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/pom.xml:
##
@@ -50,11 +46,50 @@
 org.apache.nifi
 nifi-record-serialization-service-api
 
+
+org.apache.nifi
+nifi-proxy-configuration-api
+
 
+
+software.amazon.awssdk
+apache-client

Review Comment:
   It looks like the `url-connection-client` does not support proxies directly. 
And it doesn't support connection pooling with a max, which we're depending on 
here. Fortunately, though, we can upgrade to Apache HTTP Client 5, which I 
think makes a lot of sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-10 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2913816681


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##
@@ -347,545 +335,1198 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
 private static final Set RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
 private static final Set RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-private volatile DynamoDbAsyncClient dynamoDbClient;
-private volatile CloudWatchAsyncClient cloudWatchClient;
-private volatile KinesisAsyncClient kinesisClient;
-private volatile Scheduler kinesisScheduler;
-
+private volatile SdkHttpClient kinesisHttpClient;
+private volatile SdkHttpClient dynamoHttpClient;
+private volatile KinesisClient kinesisClient;
+private volatile DynamoDbClient dynamoDbClient;
+private volatile SdkAsyncHttpClient asyncHttpClient;
+private volatile KinesisShardManager shardManager;
+private volatile KinesisConsumerClient consumerClient;
 private volatile String streamName;
-private volatile RecordBuffer.ForProcessor recordBuffer;
-
-private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-private volatile @Nullable byte[] demarcatorValue;
+private volatile int maxRecordsPerRequest;
+private volatile String initialStreamPosition;
+private volatile long maxBatchNanos;
+private volatile long maxBatchBytes;
 
-private volatile Future initializationResultFuture;
-private final AtomicBoolean initialized = new AtomicBoolean();
-
-// An instance filed, so that it can be read in getRelationships.
-private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-PROCESSING_STRATEGY.getDefaultValue());
+private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
+private volatile String efoConsumerArn;
+private final AtomicLong shardRoundRobinCounter = new AtomicLong();
 
 @Override
 protected List getSupportedPropertyDescriptors() {
 return PROPERTY_DESCRIPTORS;
 }
 
-@Override
-public void migrateProperties(final PropertyConfiguration config) {
-ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-}
-
 @Override
 public Set getRelationships() {
 return switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
 case RECORD -> RECORD_FILE_RELATIONSHIPS;
 };
 }
 
 @Override
 public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
 if (descriptor.equals(PROCESSING_STRATEGY)) {
-processingStrategy = ProcessingStrategy.from(newValue);
+processingStrategy = ProcessingStrategy.valueOf(newValue);
 }
 }
 
-@OnScheduled
-public void setup(final ProcessContext context) {
-readerRecordProcessor = switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> null;
-case RECORD -> createReaderRecordProcessor(context);
-};
-demarcatorValue = switch (processingStrategy) {
-case FLOW_FILE, RECORD -> null;
-case DEMARCATOR -> {
-final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-}
-};
+@Override
+public void migrateProperties(final PropertyConfiguration config) {
+ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
+config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+config.removeProperty("Checkpoint Interval");
+config.removeProperty("Metrics Publishing");
+}
 
+@OnScheduled
+public void onScheduled(final ProcessContext context) {
 final Region region = RegionUtil.getRegion(context);
 final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
 
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-kinesisClient = KinesisAsyncClient.builder()
-.region(region)
-.credentialsProvider(credentialsProvider)
-.endpointOverride(getKinesisEndpointOverride())
-.httpClient(createKinesisHttpClient(context))
+final ClientOverrideConfiguration clientConfig = 
ClientOverrideConfiguration.builder()
+.apiCallTimeout(API

Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-10 Thread via GitHub


awelless commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2910557024


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##
@@ -347,545 +335,1198 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
 private static final Set RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
 private static final Set RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-private volatile DynamoDbAsyncClient dynamoDbClient;
-private volatile CloudWatchAsyncClient cloudWatchClient;
-private volatile KinesisAsyncClient kinesisClient;
-private volatile Scheduler kinesisScheduler;
-
+private volatile SdkHttpClient kinesisHttpClient;
+private volatile SdkHttpClient dynamoHttpClient;
+private volatile KinesisClient kinesisClient;
+private volatile DynamoDbClient dynamoDbClient;
+private volatile SdkAsyncHttpClient asyncHttpClient;
+private volatile KinesisShardManager shardManager;
+private volatile KinesisConsumerClient consumerClient;
 private volatile String streamName;
-private volatile RecordBuffer.ForProcessor recordBuffer;
-
-private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-private volatile @Nullable byte[] demarcatorValue;
+private volatile int maxRecordsPerRequest;
+private volatile String initialStreamPosition;
+private volatile long maxBatchNanos;
+private volatile long maxBatchBytes;
 
-private volatile Future initializationResultFuture;
-private final AtomicBoolean initialized = new AtomicBoolean();
-
-// An instance filed, so that it can be read in getRelationships.
-private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-PROCESSING_STRATEGY.getDefaultValue());
+private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
+private volatile String efoConsumerArn;
+private final AtomicLong shardRoundRobinCounter = new AtomicLong();
 
 @Override
 protected List getSupportedPropertyDescriptors() {
 return PROPERTY_DESCRIPTORS;
 }
 
-@Override
-public void migrateProperties(final PropertyConfiguration config) {
-ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-}
-
 @Override
 public Set getRelationships() {
 return switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
 case RECORD -> RECORD_FILE_RELATIONSHIPS;
 };
 }
 
 @Override
 public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
 if (descriptor.equals(PROCESSING_STRATEGY)) {
-processingStrategy = ProcessingStrategy.from(newValue);
+processingStrategy = ProcessingStrategy.valueOf(newValue);
 }
 }
 
-@OnScheduled
-public void setup(final ProcessContext context) {
-readerRecordProcessor = switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> null;
-case RECORD -> createReaderRecordProcessor(context);
-};
-demarcatorValue = switch (processingStrategy) {
-case FLOW_FILE, RECORD -> null;
-case DEMARCATOR -> {
-final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-}
-};
+@Override
+public void migrateProperties(final PropertyConfiguration config) {
+ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
+config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+config.removeProperty("Checkpoint Interval");
+config.removeProperty("Metrics Publishing");
+}
 
+@OnScheduled
+public void onScheduled(final ProcessContext context) {
 final Region region = RegionUtil.getRegion(context);
 final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
 
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-kinesisClient = KinesisAsyncClient.builder()
-.region(region)
-.credentialsProvider(credentialsProvider)
-.endpointOverride(getKinesisEndpointOverride())
-.httpClient(createKinesisHttpClient(context))
+final ClientOverrideConfiguration clientConfig = 
ClientOverrideConfiguration.builder()
+.apiCallTimeout(API

Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on PR #10964:
URL: https://github.com/apache/nifi/pull/10964#issuecomment-4028079198

   Thanks for the thorough review @awelless! I did several refactorings of this 
PR before pushing it, and it looks like I did a pretty poor job of cleaning up 
a couple of the approaches that I'd taken. Should be in much better shape now! 
And you caught a few interesting points that I'd not considered, as well! I 
pushed a new commit that I think addresses everything. Added some additional 
tests. Pushed 30,085,000 records to a Kinesis Stream and then consumed all 
using both EFO and Shared Throughput mode to ensure that all data was consumed 
in exactly the correct order without any duplicates and to ensure that 
performance was as expected. All looks good!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907933877


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/EfoKinesisClient.java:
##
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
+import 
software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest;
+import 
software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import 
software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
+import 
software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
+import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * Enhanced Fan-Out Kinesis consumer that uses SubscribeToShard with dedicated 
throughput
+ * per shard via HTTP/2. Uses Reactive Streams demand-driven backpressure to 
control the
+ * rate of event delivery.
+ */
+final class EfoKinesisClient extends KinesisConsumerClient {
+
+private static final long SUBSCRIBE_BACKOFF_NANOS = 
TimeUnit.SECONDS.toNanos(5);
+private static final long CONSUMER_REGISTRATION_POLL_MILLIS = 1_000;
+private static final int CONSUMER_REGISTRATION_MAX_ATTEMPTS = 60;
+private static final int MAX_SUBSCRIPTIONS_PER_TRIGGER = 10;
+private static final int MAX_QUEUED_RESULTS = 200;
+
+private final Map shardConsumers = new 
ConcurrentHashMap<>();
+private volatile KinesisAsyncClient kinesisAsyncClient;
+private volatile String consumerArn;
+private volatile Instant timestampForInitialPosition;
+
+EfoKinesisClient(final KinesisClient kinesisClient, final ComponentLog 
logger) {
+super(kinesisClient, logger);
+}
+
+void setTimestampForInitialPosition(final Instant timestamp) {
+this.timestampForInitialPosition = timestamp;
+}
+
+@Override
+void initialize(final KinesisAsyncClient asyncClient, final String 
streamName, final String consumerName) {
+this.kinesisAsyncClient = asyncClient;
+registerEfoConsumer(streamName, consumerN

Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907880293


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/EfoKinesisClient.java:
##
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
+import 
software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest;
+import 
software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import 
software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
+import 
software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
+import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * Enhanced Fan-Out Kinesis consumer that uses SubscribeToShard with dedicated 
throughput
+ * per shard via HTTP/2. Uses Reactive Streams demand-driven backpressure to 
control the
+ * rate of event delivery.
+ */
+final class EfoKinesisClient extends KinesisConsumerClient {
+
+private static final long SUBSCRIBE_BACKOFF_NANOS = 
TimeUnit.SECONDS.toNanos(5);
+private static final long CONSUMER_REGISTRATION_POLL_MILLIS = 1_000;
+private static final int CONSUMER_REGISTRATION_MAX_ATTEMPTS = 60;
+private static final int MAX_SUBSCRIPTIONS_PER_TRIGGER = 10;
+private static final int MAX_QUEUED_RESULTS = 200;
+
+private final Map shardConsumers = new 
ConcurrentHashMap<>();
+private volatile KinesisAsyncClient kinesisAsyncClient;
+private volatile String consumerArn;
+private volatile Instant timestampForInitialPosition;
+
+EfoKinesisClient(final KinesisClient kinesisClient, final ComponentLog 
logger) {
+super(kinesisClient, logger);
+}
+
+void setTimestampForInitialPosition(final Instant timestamp) {
+this.timestampForInitialPosition = timestamp;
+}
+
+@Override
+void initialize(final KinesisAsyncClient asyncClient, final String 
streamName, final String consumerName) {
+this.kinesisAsyncClient = asyncClient;
+registerEfoConsumer(streamName, consumerN

Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907783241


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/EfoKinesisClient.java:
##
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
+import 
software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest;
+import 
software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import 
software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
+import 
software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
+import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * Enhanced Fan-Out Kinesis consumer that uses SubscribeToShard with dedicated 
throughput
+ * per shard via HTTP/2. Uses Reactive Streams demand-driven backpressure to 
control the
+ * rate of event delivery.
+ */
+final class EfoKinesisClient extends KinesisConsumerClient {
+
+private static final long SUBSCRIBE_BACKOFF_NANOS = 
TimeUnit.SECONDS.toNanos(5);
+private static final long CONSUMER_REGISTRATION_POLL_MILLIS = 1_000;
+private static final int CONSUMER_REGISTRATION_MAX_ATTEMPTS = 60;
+private static final int MAX_SUBSCRIPTIONS_PER_TRIGGER = 10;
+private static final int MAX_QUEUED_RESULTS = 200;
+
+private final Map shardConsumers = new 
ConcurrentHashMap<>();
+private volatile KinesisAsyncClient kinesisAsyncClient;
+private volatile String consumerArn;
+private volatile Instant timestampForInitialPosition;
+
+EfoKinesisClient(final KinesisClient kinesisClient, final ComponentLog 
logger) {
+super(kinesisClient, logger);
+}
+
+void setTimestampForInitialPosition(final Instant timestamp) {
+this.timestampForInitialPosition = timestamp;
+}
+
+@Override
+void initialize(final KinesisAsyncClient asyncClient, final String 
streamName, final String consumerName) {
+this.kinesisAsyncClient = asyncClient;
+registerEfoConsumer(streamName, consumerN

Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907173203


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##
@@ -347,545 +316,1121 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
 private static final Set RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
 private static final Set RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-private volatile DynamoDbAsyncClient dynamoDbClient;
-private volatile CloudWatchAsyncClient cloudWatchClient;
-private volatile KinesisAsyncClient kinesisClient;
-private volatile Scheduler kinesisScheduler;
-
+private volatile SdkHttpClient kinesisHttpClient;
+private volatile SdkHttpClient dynamoHttpClient;
+private volatile KinesisClient kinesisClient;
+private volatile DynamoDbClient dynamoDbClient;
+private volatile SdkAsyncHttpClient asyncHttpClient;
+private volatile KinesisShardManager shardManager;
+private volatile KinesisConsumerClient consumerClient;
 private volatile String streamName;
-private volatile RecordBuffer.ForProcessor recordBuffer;
-
-private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-private volatile @Nullable byte[] demarcatorValue;
+private volatile int maxRecordsPerRequest;
+private volatile String initialStreamPosition;
+private volatile long maxBatchNanos;
+private volatile long maxBatchBytes;
 
-private volatile Future initializationResultFuture;
-private final AtomicBoolean initialized = new AtomicBoolean();
-
-// An instance filed, so that it can be read in getRelationships.
-private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-PROCESSING_STRATEGY.getDefaultValue());
+private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
 
 @Override
 protected List getSupportedPropertyDescriptors() {
 return PROPERTY_DESCRIPTORS;
 }
 
-@Override
-public void migrateProperties(final PropertyConfiguration config) {
-ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-}
-
 @Override
 public Set getRelationships() {
 return switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
 case RECORD -> RECORD_FILE_RELATIONSHIPS;
 };
 }
 
 @Override
 public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
 if (descriptor.equals(PROCESSING_STRATEGY)) {
-processingStrategy = ProcessingStrategy.from(newValue);
+processingStrategy = ProcessingStrategy.valueOf(newValue);
 }
 }
 
-@OnScheduled
-public void setup(final ProcessContext context) {
-readerRecordProcessor = switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> null;
-case RECORD -> createReaderRecordProcessor(context);
-};
-demarcatorValue = switch (processingStrategy) {
-case FLOW_FILE, RECORD -> null;
-case DEMARCATOR -> {
-final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-}
-};
+@Override
+public void migrateProperties(final PropertyConfiguration config) {
+config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+config.removeProperty("Checkpoint Interval");
+config.removeProperty("Metrics Publishing");
+}
+
+@Override
+public void migrateRelationships(final RelationshipConfiguration config) {
+config.renameRelationship("parse failure", "parse.failure");
+}
 
+@OnScheduled
+public void onScheduled(final ProcessContext context) {
 final Region region = RegionUtil.getRegion(context);
 final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
 
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-kinesisClient = KinesisAsyncClient.builder()
-.region(region)
-.credentialsProvider(credentialsProvider)
-.endpointOverride(getKinesisEndpointOverride())
-.httpClient(createKinesisHttpClient(context))
+final ClientOverrideConfiguration clientConfig = 
ClientOverrideConfiguration.builder()
+.apiCallTimeout(API_CALL_TIMEOUT)
+

Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907057270


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##
@@ -347,545 +316,1121 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
 private static final Set RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
 private static final Set RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-private volatile DynamoDbAsyncClient dynamoDbClient;
-private volatile CloudWatchAsyncClient cloudWatchClient;
-private volatile KinesisAsyncClient kinesisClient;
-private volatile Scheduler kinesisScheduler;
-
+private volatile SdkHttpClient kinesisHttpClient;
+private volatile SdkHttpClient dynamoHttpClient;
+private volatile KinesisClient kinesisClient;
+private volatile DynamoDbClient dynamoDbClient;
+private volatile SdkAsyncHttpClient asyncHttpClient;
+private volatile KinesisShardManager shardManager;
+private volatile KinesisConsumerClient consumerClient;
 private volatile String streamName;
-private volatile RecordBuffer.ForProcessor recordBuffer;
-
-private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-private volatile @Nullable byte[] demarcatorValue;
+private volatile int maxRecordsPerRequest;
+private volatile String initialStreamPosition;
+private volatile long maxBatchNanos;
+private volatile long maxBatchBytes;
 
-private volatile Future initializationResultFuture;
-private final AtomicBoolean initialized = new AtomicBoolean();
-
-// An instance filed, so that it can be read in getRelationships.
-private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-PROCESSING_STRATEGY.getDefaultValue());
+private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
 
 @Override
 protected List getSupportedPropertyDescriptors() {
 return PROPERTY_DESCRIPTORS;
 }
 
-@Override
-public void migrateProperties(final PropertyConfiguration config) {
-ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-}
-
 @Override
 public Set getRelationships() {
 return switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
 case RECORD -> RECORD_FILE_RELATIONSHIPS;
 };
 }
 
 @Override
 public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
 if (descriptor.equals(PROCESSING_STRATEGY)) {
-processingStrategy = ProcessingStrategy.from(newValue);
+processingStrategy = ProcessingStrategy.valueOf(newValue);
 }
 }
 
-@OnScheduled
-public void setup(final ProcessContext context) {
-readerRecordProcessor = switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> null;
-case RECORD -> createReaderRecordProcessor(context);
-};
-demarcatorValue = switch (processingStrategy) {
-case FLOW_FILE, RECORD -> null;
-case DEMARCATOR -> {
-final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-}
-};
+@Override
+public void migrateProperties(final PropertyConfiguration config) {
+config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+config.removeProperty("Checkpoint Interval");
+config.removeProperty("Metrics Publishing");
+}
+
+@Override
+public void migrateRelationships(final RelationshipConfiguration config) {
+config.renameRelationship("parse failure", "parse.failure");
+}
 
+@OnScheduled
+public void onScheduled(final ProcessContext context) {
 final Region region = RegionUtil.getRegion(context);
 final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
 
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-kinesisClient = KinesisAsyncClient.builder()
-.region(region)
-.credentialsProvider(credentialsProvider)
-.endpointOverride(getKinesisEndpointOverride())
-.httpClient(createKinesisHttpClient(context))
+final ClientOverrideConfiguration clientConfig = 
ClientOverrideConfiguration.builder()
+.apiCallTimeout(API_CALL_TIMEOUT)
+

Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907039969


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##
@@ -347,545 +316,1121 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
 private static final Set RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
 private static final Set RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-private volatile DynamoDbAsyncClient dynamoDbClient;
-private volatile CloudWatchAsyncClient cloudWatchClient;
-private volatile KinesisAsyncClient kinesisClient;
-private volatile Scheduler kinesisScheduler;
-
+private volatile SdkHttpClient kinesisHttpClient;
+private volatile SdkHttpClient dynamoHttpClient;
+private volatile KinesisClient kinesisClient;
+private volatile DynamoDbClient dynamoDbClient;
+private volatile SdkAsyncHttpClient asyncHttpClient;
+private volatile KinesisShardManager shardManager;
+private volatile KinesisConsumerClient consumerClient;
 private volatile String streamName;
-private volatile RecordBuffer.ForProcessor recordBuffer;
-
-private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-private volatile @Nullable byte[] demarcatorValue;
+private volatile int maxRecordsPerRequest;
+private volatile String initialStreamPosition;
+private volatile long maxBatchNanos;
+private volatile long maxBatchBytes;
 
-private volatile Future initializationResultFuture;
-private final AtomicBoolean initialized = new AtomicBoolean();
-
-// An instance filed, so that it can be read in getRelationships.
-private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-PROCESSING_STRATEGY.getDefaultValue());
+private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
 
 @Override
 protected List getSupportedPropertyDescriptors() {
 return PROPERTY_DESCRIPTORS;
 }
 
-@Override
-public void migrateProperties(final PropertyConfiguration config) {
-ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-}
-
 @Override
 public Set getRelationships() {
 return switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
 case RECORD -> RECORD_FILE_RELATIONSHIPS;
 };
 }
 
 @Override
 public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
 if (descriptor.equals(PROCESSING_STRATEGY)) {
-processingStrategy = ProcessingStrategy.from(newValue);
+processingStrategy = ProcessingStrategy.valueOf(newValue);
 }
 }
 
-@OnScheduled
-public void setup(final ProcessContext context) {
-readerRecordProcessor = switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> null;
-case RECORD -> createReaderRecordProcessor(context);
-};
-demarcatorValue = switch (processingStrategy) {
-case FLOW_FILE, RECORD -> null;
-case DEMARCATOR -> {
-final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-}
-};
+@Override
+public void migrateProperties(final PropertyConfiguration config) {
+config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+config.removeProperty("Checkpoint Interval");
+config.removeProperty("Metrics Publishing");
+}
+
+@Override
+public void migrateRelationships(final RelationshipConfiguration config) {
+config.renameRelationship("parse failure", "parse.failure");
+}
 
+@OnScheduled
+public void onScheduled(final ProcessContext context) {
 final Region region = RegionUtil.getRegion(context);
 final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
 
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-kinesisClient = KinesisAsyncClient.builder()
-.region(region)
-.credentialsProvider(credentialsProvider)
-.endpointOverride(getKinesisEndpointOverride())
-.httpClient(createKinesisHttpClient(context))
+final ClientOverrideConfiguration clientConfig = 
ClientOverrideConfiguration.builder()
+.apiCallTimeout(API_CALL_TIMEOUT)
+

Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907005098


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##
@@ -272,47 +238,49 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
 
 static final PropertyDescriptor STREAM_POSITION_TIMESTAMP = new 
PropertyDescriptor.Builder()
 .name("Stream Position Timestamp")
-.description("Timestamp position in stream from which to start 
reading Kinesis Records. The timestamp must be in ISO 8601 format.")
+.description("Timestamp position in stream from which to start 
reading Kinesis Records. Must be in ISO 8601 format.")
 .required(true)
 .addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR)
 .dependsOn(INITIAL_STREAM_POSITION, InitialPosition.AT_TIMESTAMP)
 .build();
 
-static final PropertyDescriptor MAX_BYTES_TO_BUFFER = new 
PropertyDescriptor.Builder()
-.name("Max Bytes to Buffer")
-.description("""
-The maximum size of Kinesis Records that can be buffered 
in memory before being processed by NiFi.
-If the buffer size exceeds the limit, the processor will 
stop consuming new records until free space is available.
-
-Using a larger value may increase the throughput, but will 
do so at the expense of using more memory.
-""")
+static final PropertyDescriptor MAX_RECORDS_PER_REQUEST = new 
PropertyDescriptor.Builder()
+.name("Max Records Per Request")
+.description("The maximum number of records to retrieve per 
GetRecords call. Maximum is 10,000.")
 .required(true)
-.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-.defaultValue("100 MB")
+.defaultValue("1000")
+.addValidator(StandardValidators.createLongValidator(1, 1, 
true))
 .build();
 
-static final PropertyDescriptor CHECKPOINT_INTERVAL = new 
PropertyDescriptor.Builder()
-.name("Checkpoint Interval")
+static final PropertyDescriptor MAX_BATCH_DURATION = new 
PropertyDescriptor.Builder()
+.name("Max Batch Duration")
 .description("""
-Interval between checkpointing consumed Kinesis records. 
To checkpoint records each time the Processor is run, set this value to 0 
seconds.
-
-More frequent checkpoint may reduce performance and 
increase DynamoDB costs,
-but less frequent checkpointing may result in duplicates 
when a Shard lease is lost or NiFi is restarted.
-""")
+The maximum amount of time to spend consuming records in a 
single invocation before \
+committing the session and checkpointing.""")
 .required(true)
-.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
 .defaultValue("5 sec")
+.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
 .build();
 
-static final PropertyDescriptor METRICS_PUBLISHING = new 
PropertyDescriptor.Builder()
-.name("Metrics Publishing")
-.description("Specifies where Kinesis usage metrics are published 
to.")
+static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("Max Batch Size")
+.description("""
+The maximum amount of data to consume in a single 
invocation before committing the \
+session and checkpointing.""")
 .required(true)
-.allowableValues(MetricsPublishing.class)
-.defaultValue(MetricsPublishing.DISABLED)
+.defaultValue("10 MB")
+.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
 .build();
 
-static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP, 
ProxySpec.HTTP_AUTH);
+static final PropertyDescriptor ENDPOINT_OVERRIDE = new 
PropertyDescriptor.Builder()

Review Comment:
   No, we want to expose Endpoint Override. Many users use Endpoint Overrides 
for a lot of AWS services.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907002018


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##
@@ -347,545 +316,1121 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
 private static final Set RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
 private static final Set RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-private volatile DynamoDbAsyncClient dynamoDbClient;
-private volatile CloudWatchAsyncClient cloudWatchClient;
-private volatile KinesisAsyncClient kinesisClient;
-private volatile Scheduler kinesisScheduler;
-
+private volatile SdkHttpClient kinesisHttpClient;
+private volatile SdkHttpClient dynamoHttpClient;
+private volatile KinesisClient kinesisClient;
+private volatile DynamoDbClient dynamoDbClient;
+private volatile SdkAsyncHttpClient asyncHttpClient;
+private volatile KinesisShardManager shardManager;
+private volatile KinesisConsumerClient consumerClient;
 private volatile String streamName;
-private volatile RecordBuffer.ForProcessor recordBuffer;
-
-private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-private volatile @Nullable byte[] demarcatorValue;
+private volatile int maxRecordsPerRequest;
+private volatile String initialStreamPosition;
+private volatile long maxBatchNanos;
+private volatile long maxBatchBytes;
 
-private volatile Future initializationResultFuture;
-private final AtomicBoolean initialized = new AtomicBoolean();
-
-// An instance filed, so that it can be read in getRelationships.
-private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-PROCESSING_STRATEGY.getDefaultValue());
+private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
 
 @Override
 protected List getSupportedPropertyDescriptors() {
 return PROPERTY_DESCRIPTORS;
 }
 
-@Override
-public void migrateProperties(final PropertyConfiguration config) {
-ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-}
-
 @Override
 public Set getRelationships() {
 return switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
 case RECORD -> RECORD_FILE_RELATIONSHIPS;
 };
 }
 
 @Override
 public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
 if (descriptor.equals(PROCESSING_STRATEGY)) {
-processingStrategy = ProcessingStrategy.from(newValue);
+processingStrategy = ProcessingStrategy.valueOf(newValue);
 }
 }
 
-@OnScheduled
-public void setup(final ProcessContext context) {
-readerRecordProcessor = switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> null;
-case RECORD -> createReaderRecordProcessor(context);
-};
-demarcatorValue = switch (processingStrategy) {
-case FLOW_FILE, RECORD -> null;
-case DEMARCATOR -> {
-final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-}
-};
+@Override
+public void migrateProperties(final PropertyConfiguration config) {
+config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+config.removeProperty("Checkpoint Interval");
+config.removeProperty("Metrics Publishing");
+}
+
+@Override
+public void migrateRelationships(final RelationshipConfiguration config) {
+config.renameRelationship("parse failure", "parse.failure");

Review Comment:
   Yeah, that's an artifact of some refactoring that I did. Good catch. Will 
remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2906908197


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##
@@ -30,154 +29,132 @@
 import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
-import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.migration.PropertyConfiguration;
-import org.apache.nifi.migration.ProxyServiceMigration;
+import org.apache.nifi.migration.RelationshipConfiguration;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import 
org.apache.nifi.processors.aws.credentials.provider.AwsCredentialsProviderService;
-import org.apache.nifi.processors.aws.kinesis.MemoryBoundRecordBuffer.Lease;
-import 
org.apache.nifi.processors.aws.kinesis.ReaderRecordProcessor.ProcessingResult;
-import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferId;
-import 
org.apache.nifi.processors.aws.kinesis.converter.InjectMetadataRecordConverter;
-import org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverter;
-import org.apache.nifi.processors.aws.kinesis.converter.ValueRecordConverter;
-import org.apache.nifi.processors.aws.kinesis.converter.WrapperRecordConverter;
 import org.apache.nifi.processors.aws.region.RegionUtil;
 import org.apache.nifi.proxy.ProxyConfiguration;
-import org.apache.nifi.proxy.ProxyConfigurationService;
 import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
-import software.amazon.awssdk.http.nio.netty.Http2Configuration;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
-import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
-import software.amazon.kinesis.common.ConfigsBuilder;
-import software.amazon.kinesis.common.InitialPositionInStream;
-import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-import software.amazon.kinesis.coordinator.Scheduler;
-import software.amazon.kinesis.coordinator.WorkerStateChangeListener;
-import software.amazon.kinesis.lifecycle.events.InitializationInput;
-import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
-import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
-import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
-import software.amazon.kinesis.metrics.LogMetricsFactory;
-import software.amazon.kinesis.metrics.MetricsFactory;
-import software.amazon.kinesis.metrics.NullMetricsFactory;
-import software.amazon.kinesis.processor.ShardRecordProcessor;
-import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
-import software.amazon.kinesis.processor.SingleStreamTracker;
-import software.amazon.kinesis.retrieval.KinesisClientRecord;
-import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
-import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
-import software.amazon.kinesis.retrieval.polling.PollingConfig;
-
+import software.amazon.aw

Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2906871680


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/CheckpointTableUtils.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.KeyType;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
+import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Shared DynamoDB table lifecycle operations for checkpoint tables. Used by 
both
+ * {@link KinesisShardManager} for runtime table management and
+ * {@link LegacyCheckpointMigrator} for migration and rename operations.
+ */
+final class CheckpointTableUtils {
+
+static final String NODE_HEARTBEAT_PREFIX = "__node__#";
+static final String MIGRATION_MARKER_SHARD_ID = "__migration__";
+
+private static final long TABLE_POLL_MILLIS = 1_000;
+private static final int TABLE_POLL_MAX_ATTEMPTS = 60;
+
+private CheckpointTableUtils() { }
+
+enum TableSchema {
+NEW,
+LEGACY,
+UNKNOWN,
+NOT_FOUND
+}
+
+static TableSchema getTableSchema(final DynamoDbClient client, final 
String tableName) {
+try {
+final DescribeTableResponse describe = client.describeTable(
+
DescribeTableRequest.builder().tableName(tableName).build());
+final List keySchema = 
describe.table().keySchema();
+if (keySchema.size() == 2
+&& hasKey(keySchema, "streamName", KeyType.HASH)
+&& hasKey(keySchema, "shardId", KeyType.RANGE)) {
+return TableSchema.NEW;
+}
+if (keySchema.size() == 1 && hasKey(keySchema, "leaseKey", 
KeyType.HASH)) {
+return TableSchema.LEGACY;
+}
+return TableSchema.UNKNOWN;
+} catch (final ResourceNotFoundException notFound) {
+return TableSchema.NOT_FOUND;
+}
+}
+
+static void createNewSchemaTable(final DynamoDbClient client, final 
ComponentLog logger, final String tableName) {
+final TableSchema tableSchema = getTableSchema(client, tableName);
+if (tableSchema == TableSchema.NEW) {
+logger.info("DynamoDB checkpoint table [{}] already exists", 
tableName);
+return;
+}
+if (tableSchema == TableSchema.LEGACY || tableSchema == 
TableSchema.UNKNOWN) {
+throw new ProcessException(
+"Checkpoint table [%s] exists but does not match expected 
schema".formatted(tableName));
+}
+
+logger.info("Creating DynamoDB checkpoint table [{}]", tableName);
+try {
+final CreateTableRequest request = CreateTableRequest.builder()
+.tableName(tableName)
+.keySchema(
+

Re: [PR] NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th… [nifi]

2026-03-09 Thread via GitHub


awelless commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2906706849


##
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##
@@ -347,545 +316,1121 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
 private static final Set RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
 private static final Set RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-private volatile DynamoDbAsyncClient dynamoDbClient;
-private volatile CloudWatchAsyncClient cloudWatchClient;
-private volatile KinesisAsyncClient kinesisClient;
-private volatile Scheduler kinesisScheduler;
-
+private volatile SdkHttpClient kinesisHttpClient;
+private volatile SdkHttpClient dynamoHttpClient;
+private volatile KinesisClient kinesisClient;
+private volatile DynamoDbClient dynamoDbClient;
+private volatile SdkAsyncHttpClient asyncHttpClient;
+private volatile KinesisShardManager shardManager;
+private volatile KinesisConsumerClient consumerClient;
 private volatile String streamName;
-private volatile RecordBuffer.ForProcessor recordBuffer;
-
-private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-private volatile @Nullable byte[] demarcatorValue;
+private volatile int maxRecordsPerRequest;
+private volatile String initialStreamPosition;
+private volatile long maxBatchNanos;
+private volatile long maxBatchBytes;
 
-private volatile Future initializationResultFuture;
-private final AtomicBoolean initialized = new AtomicBoolean();
-
-// An instance filed, so that it can be read in getRelationships.
-private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-PROCESSING_STRATEGY.getDefaultValue());
+private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
 
 @Override
 protected List getSupportedPropertyDescriptors() {
 return PROPERTY_DESCRIPTORS;
 }
 
-@Override
-public void migrateProperties(final PropertyConfiguration config) {
-ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-}
-
 @Override
 public Set getRelationships() {
 return switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
 case RECORD -> RECORD_FILE_RELATIONSHIPS;
 };
 }
 
 @Override
 public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
 if (descriptor.equals(PROCESSING_STRATEGY)) {
-processingStrategy = ProcessingStrategy.from(newValue);
+processingStrategy = ProcessingStrategy.valueOf(newValue);
 }
 }
 
-@OnScheduled
-public void setup(final ProcessContext context) {
-readerRecordProcessor = switch (processingStrategy) {
-case FLOW_FILE, DEMARCATOR -> null;
-case RECORD -> createReaderRecordProcessor(context);
-};
-demarcatorValue = switch (processingStrategy) {
-case FLOW_FILE, RECORD -> null;
-case DEMARCATOR -> {
-final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-}
-};
+@Override
+public void migrateProperties(final PropertyConfiguration config) {
+config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+config.removeProperty("Checkpoint Interval");
+config.removeProperty("Metrics Publishing");
+}
+
+@Override
+public void migrateRelationships(final RelationshipConfiguration config) {
+config.renameRelationship("parse failure", "parse.failure");
+}
 
+@OnScheduled
+public void onScheduled(final ProcessContext context) {
 final Region region = RegionUtil.getRegion(context);
 final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
 
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-kinesisClient = KinesisAsyncClient.builder()
-.region(region)
-.credentialsProvider(credentialsProvider)
-.endpointOverride(getKinesisEndpointOverride())
-.httpClient(createKinesisHttpClient(context))
+final ClientOverrideConfiguration clientConfig = 
ClientOverrideConfiguration.builder()
+.apiCallTimeout(API_CALL_TIMEOUT)
+