[ https://issues.apache.org/jira/browse/FLINK-15769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-15769: ----------------------------------- Labels: pull-request-available (was: ) > Allow configuring offset startup positions for Stateful Functions Kafka > Ingress > ------------------------------------------------------------------------------- > > Key: FLINK-15769 > URL: https://issues.apache.org/jira/browse/FLINK-15769 > Project: Flink > Issue Type: New Feature > Components: Stateful Functions > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Major > Labels: pull-request-available > > It is quite typical that a user is capable of setting where to start > consuming a Kafka topic. > Since the Stateful Functions Kafka ingress sits on top of Flink's Kafka > consumer, there is already various options to start with: > * {{GROUP_OFFSETS}} (default): start with whatever offsets were committed to > Kafka for given {{group.id}} > * {{LATEST}}: start from latest record in topic > * {{EARLIEST}}: start from earliest record in topic > * {{SPECIFIC_OFFSETS}}: provide a map of topic partition -> offset. This is > particularly important for bootstrapped state scenarios, where the user would > want to start from a specific position consistent with the state bootstrapped > in their functions. > * {{TIMESTAMP}}: start from offsets written starting from the given timestamp. > The proposed API looks like so: > {code} > KafkaIngressBuilder<T> builder = KafkaIngressBuilder.forIdentifier(...) > .withTopic(...) > .withDeserializer(...) > > .withDefaultStartPosition(KafkaIngressStartPosition.fromEarliest()/fromLatest()) > > .withSpecificStartOffsets(KafkaIngressStartOffsets.fromMap(Map)/fromTimestamp(Long)) > {code} > The {{withDefaultStartPosition}} method is straightforward. > The reason to separate this from another {{withSpecificStartOffsets}} method > is that there would be cases where some partition does not contain the > offsets specified by {{withSpecificStartOffsets}}. > In this case, the ingress would need to fallback to some default > configuration; this would be the {{withDefaultStartPosition}} configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)