Hi Arvid, thank you so much for the detailed reply!

A few replies / questions inline. Somewhat relatedly, I'm also wondering
where this connector should live. I saw that there's already a pubsub
connector in
https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
so if flink is willing to host it, perhaps it could live near there?
Alternatively, it could live alongside our client library in
https://github.com/googleapis/java-pubsublite.

On Mon, May 3, 2021 at 1:54 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Evan,
>
> 1) You are absolutely correct that we would urge users to add new sources
> as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
> For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource
> [2] as a starting point. Especially basing the reader implementation on
> SingleThreadMultiplexSourceReaderBase will give you some performance boost
> over naive implementations.
> It is probably initially overwhelming but there is lots of thought behind
> the Source interface. We plan on having better documentation and more
> examples in the next months to ease the ramp up but it's also kind of a
> hen-egg problem.
>

Okay, great, the Source interface seems much easier to work with. I haven't
gotten around to thinking about our Sink yet, but I'm sure I'll have some
questions when I do :)

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase.
It seems like these base implementations are mostly designed to help in
cases where the client library uses a synchronous pull based approach. Our
client library is async - we use a bidirectional stream to pull
messages from our brokers and we have some flow control settings to limit
the number of bytes and messages outstanding to the client. I'm wondering
if because of this, we should just implement the SourceReader interface. In
particular, we have a per partition subscriber class which buffers messages
up to the flow control limit and exposes an API almost identical to
SourceReader's pollNext and IsAvailable. What do you think?

>
> I can also provide guidance outside of the ML if it's easier.
>
> 2) You are right, the currentParallelism is static in respect to the
> creation of the SourceReaders. Any change to the parallelism would also
> cause a recreation of the readers.
> Splits are usually checkpointed alongside the readers. On recovery, the
> readers are restored with their old splits. Only when splits cannot be
> recovered in the context of a reader (for example downscaling), the splits
> would be re-added to the enumerator.
>

> Rebalancing can happen in SplitEnumerator#addReader or
> #handleSplitRequest. The Kafka and File source use even different
> approaches with eager and lazy initialization respectively. Further, you
> can send arbitrary events between the enumerator and readers to work out
> the rebalancing. In theory, you can also dynamically rebalance splits,
> however, you lose ordering guarantees of the messages at the moment (if you
> have records r1, r2 in this order in split s and you reassign s, then you
> may end up with r2, r1 in the sink).
>

Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would
break ordering guarantees, so when I read through the Kafka source, I was
really confused by the lack of rebalancing.

>
> [1]
> https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75-L75
> [2]
> https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java#L99-L99
>
> On Mon, May 3, 2021 at 1:40 AM Evan Palmer <palm...@google.com> wrote:
>
>> Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite
>> which is a partition based Pub/Sub product, and I have a few questions.
>>
>> 1.
>>
>> I saw that there are two sets of interfaces used in existing sources: The
>> RichSourceFunction, and the set of interfaces from FLIP-27. It seems like
>> the Source interfaces are preferred for new sources, but I wanted to be
>> sure.
>>
>> 2.
>>
>> I’m having a little bit of trouble working out how when the
>> currentParallelism returned by the SplitEnumeratorContext [1] can change,
>> and how a source should react to that.
>>
>> For context, I’m currently thinking about single partitions as “splits”,
>> so a source would have an approximately constant number of splits which
>> each has an potentially unbounded amount of work (at least in continuous
>> mode). Each split will be assigned to some SourceReader by the split
>> enumerator. If the value of currentParallelism changes, it seems like I’ll
>> need to find a way to redistribute my partitions over SourceReaders, or
>> else I'll end up with an unbalanced distribution of partitions to
>> SourceReaders.
>>
>> I looked at the docs on elastic scaling [2], and it seems like when the
>> parallelism of the source changes, the source will be checkpointed and
>> restored. I think this would mean all the SourceReaders get restarted, and
>> their splits are returned to the SplitEnumerator for reassignment. Is this
>> approximately correct?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/
>>
>>

Reply via email to