My two cents here,
- flink job already has back pressure so rate limit can be done via setting
parallelism to proper number in some use cases. There is an open issue of
checkpointing reliability when back pressure, community seems working on it.
- rate limit can be abused easily and cause lot of confusions. Think about a
use case where you have two streams do a simple interval join. Unless you were
able to rate limit both with proper value dynamiclly, you might see timestamp
and watermark gaps keep increasing causing checkpointing failure.
So the question might be, instead of looking at rate limit of one source, how
to slow down all sources without ever increasing time, wm gaps. It sounds
complicated already.
with what being said, if you really want to have rate limit on your own, you
can try following code :) It works well for us.
public class SynchronousKafkaConsumer<T> extends FlinkKafkaConsumer<T> {
protected static final Logger LOG =
LoggerFactory.getLogger(SynchronousKafkaConsumer.class);
private final double topicRateLimit;
private transient RateLimiter subtaskRateLimiter;
@Override
public void open(Configuration configuration) throws Exception {
Preconditions.checkArgument(
topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 0.1,
"subtask ratelimit should be greater than 0.1 QPS");
subtaskRateLimiter = RateLimiter.create(
topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
super.open(configuration);
}
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> partitionsWithOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {
return new KafkaFetcher<T>(
sourceContext,
partitionsWithOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics) {
@Override
protected void emitRecord(T record,
KafkaTopicPartitionState<TopicPartition>
partitionState,
long offset) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecord(record, partitionState, offset);
}
@Override
protected void emitRecordWithTimestamp(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset, long timestamp) throws
Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
};
}
Thanks,
Chen
Pinterest Data
> On Jul 6, 2020, at 7:43 AM, David Magalhães <[email protected]> wrote:
>
> I've noticed that this FLINK-11501 was implemented in
> flink-connector-kafka-0.10 [1], but it wasn't in the current version of the
> flink-connector-kafka. There is any reason for this, and why should be the
> best solution to implement a rate limit functionality in the current Kafka
> consumer?
>
> Thanks,
> David
>
> [1]
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
>
> <https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java>
>
> [2]
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
>
> <https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java>