Hi Evan, 1) You are absolutely correct that we would urge users to add new sources as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases. For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource [2] as a starting point. Especially basing the reader implementation on SingleThreadMultiplexSourceReaderBase will give you some performance boost over naive implementations. It is probably initially overwhelming but there is lots of thought behind the Source interface. We plan on having better documentation and more examples in the next months to ease the ramp up but it's also kind of a hen-egg problem.
I can also provide guidance outside of the ML if it's easier. 2) You are right, the currentParallelism is static in respect to the creation of the SourceReaders. Any change to the parallelism would also cause a recreation of the readers. Splits are usually checkpointed alongside the readers. On recovery, the readers are restored with their old splits. Only when splits cannot be recovered in the context of a reader (for example downscaling), the splits would be re-added to the enumerator. Rebalancing can happen in SplitEnumerator#addReader or #handleSplitRequest. The Kafka and File source use even different approaches with eager and lazy initialization respectively. Further, you can send arbitrary events between the enumerator and readers to work out the rebalancing. In theory, you can also dynamically rebalance splits, however, you lose ordering guarantees of the messages at the moment (if you have records r1, r2 in this order in split s and you reassign s, then you may end up with r2, r1 in the sink). [1] https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75-L75 [2] https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java#L99-L99 On Mon, May 3, 2021 at 1:40 AM Evan Palmer <palm...@google.com> wrote: > Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite > which is a partition based Pub/Sub product, and I have a few questions. > > 1. > > I saw that there are two sets of interfaces used in existing sources: The > RichSourceFunction, and the set of interfaces from FLIP-27. It seems like > the Source interfaces are preferred for new sources, but I wanted to be > sure. > > 2. > > I’m having a little bit of trouble working out how when the > currentParallelism returned by the SplitEnumeratorContext [1] can change, > and how a source should react to that. > > For context, I’m currently thinking about single partitions as “splits”, > so a source would have an approximately constant number of splits which > each has an potentially unbounded amount of work (at least in continuous > mode). Each split will be assigned to some SourceReader by the split > enumerator. If the value of currentParallelism changes, it seems like I’ll > need to find a way to redistribute my partitions over SourceReaders, or > else I'll end up with an unbalanced distribution of partitions to > SourceReaders. > > I looked at the docs on elastic scaling [2], and it seems like when the > parallelism of the source changes, the source will be checkpointed and > restored. I think this would mean all the SourceReaders get restarted, and > their splits are returned to the SplitEnumerator for reassignment. Is this > approximately correct? > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism-- > > [2] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/ > >