Hi Evan,

A few replies / questions inline. Somewhat relatedly, I'm also wondering
> where this connector should live. I saw that there's already a pubsub
> connector in
> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
> so if flink is willing to host it, perhaps it could live near there?
> Alternatively, it could live alongside our client library in
> https://github.com/googleapis/java-pubsublite.
>

For a long time, the community has been thinking of moving (most)
connectors out of the repository. Especially now with the new source/sink
interface, the need to decouple Flink release cycle and connector release
cycle is bigger than ever as we do not backport features in our bugfix
branches. Thus, Pubsub Lite would only be available in Flink 1.14 and many
users would need to wait up to a year to effectively use the source
(adaption of new Flink versions is usually slow).
Therefore, I'd definitely encourage you to have the connector along your
client library - where the release cycles probably also much better align.
I will soon present an idea on how to list all available connectors on
Flink's connector page such that from a user's perspective, it wouldn't
matter if it's internal and external. If it turns out that the community
rather wants to have all connectors still in the main repo, we can look at
contributing it at a later point in time.

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase.
> It seems like these base implementations are mostly designed to help in
> cases where the client library uses a synchronous pull based approach. Our
> client library is async - we use a bidirectional stream to pull
> messages from our brokers and we have some flow control settings to limit
> the number of bytes and messages outstanding to the client. I'm wondering
> if because of this, we should just implement the SourceReader interface. In
> particular, we have a per partition subscriber class which buffers messages
> up to the flow control limit and exposes an API almost identical to
> SourceReader's pollNext and IsAvailable. What do you think?
>

Good catch. Yes, the implementation is more or less simulating the async
fetching that your library apparently offers already. So feel free to skip
it. Of course, if it turns out that you still need certain building blocks,
such as record handover, we can also discuss pulling up a common base class
to the async sources and the SingleThreadMultiplexSourceReaderBase.

> Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would
> break ordering guarantees, so when I read through the Kafka source, I was
> really confused by the lack of rebalancing.


We have some ideas on how to make it more dynamic but they are very far
down the road and we can hopefully implement them in a transparent way to
the sources.

On Fri, May 7, 2021 at 11:23 PM Evan Palmer <palm...@google.com> wrote:

> Hi Arvid, thank you so much for the detailed reply!
>
> A few replies / questions inline. Somewhat relatedly, I'm also wondering
> where this connector should live. I saw that there's already a pubsub
> connector in
> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
> so if flink is willing to host it, perhaps it could live near there?
> Alternatively, it could live alongside our client library in
> https://github.com/googleapis/java-pubsublite.
>
> On Mon, May 3, 2021 at 1:54 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Evan,
>>
>> 1) You are absolutely correct that we would urge users to add new sources
>> as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
>> For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource
>> [2] as a starting point. Especially basing the reader implementation on
>> SingleThreadMultiplexSourceReaderBase will give you some performance boost
>> over naive implementations.
>> It is probably initially overwhelming but there is lots of thought behind
>> the Source interface. We plan on having better documentation and more
>> examples in the next months to ease the ramp up but it's also kind of a
>> hen-egg problem.
>>
>
> Okay, great, the Source interface seems much easier to work with. I
> haven't gotten around to thinking about our Sink yet, but I'm sure I'll
> have some questions when I do :)
>
> I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase.
> It seems like these base implementations are mostly designed to help in
> cases where the client library uses a synchronous pull based approach. Our
> client library is async - we use a bidirectional stream to pull
> messages from our brokers and we have some flow control settings to limit
> the number of bytes and messages outstanding to the client. I'm wondering
> if because of this, we should just implement the SourceReader interface. In
> particular, we have a per partition subscriber class which buffers messages
> up to the flow control limit and exposes an API almost identical to
> SourceReader's pollNext and IsAvailable. What do you think?
>
>>
>> I can also provide guidance outside of the ML if it's easier.
>>
>> 2) You are right, the currentParallelism is static in respect to the
>> creation of the SourceReaders. Any change to the parallelism would also
>> cause a recreation of the readers.
>> Splits are usually checkpointed alongside the readers. On recovery, the
>> readers are restored with their old splits. Only when splits cannot be
>> recovered in the context of a reader (for example downscaling), the splits
>> would be re-added to the enumerator.
>>
>
>> Rebalancing can happen in SplitEnumerator#addReader or
>> #handleSplitRequest. The Kafka and File source use even different
>> approaches with eager and lazy initialization respectively. Further, you
>> can send arbitrary events between the enumerator and readers to work out
>> the rebalancing. In theory, you can also dynamically rebalance splits,
>> however, you lose ordering guarantees of the messages at the moment (if you
>> have records r1, r2 in this order in split s and you reassign s, then you
>> may end up with r2, r1 in the sink).
>>
>
> Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would
> break ordering guarantees, so when I read through the Kafka source, I was
> really confused by the lack of rebalancing.
>
>>
>> [1]
>> https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75-L75
>> [2]
>> https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java#L99-L99
>>
>> On Mon, May 3, 2021 at 1:40 AM Evan Palmer <palm...@google.com> wrote:
>>
>>> Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite
>>> which is a partition based Pub/Sub product, and I have a few questions.
>>>
>>> 1.
>>>
>>> I saw that there are two sets of interfaces used in existing sources:
>>> The RichSourceFunction, and the set of interfaces from FLIP-27. It seems
>>> like the Source interfaces are preferred for new sources, but I wanted to
>>> be sure.
>>>
>>> 2.
>>>
>>> I’m having a little bit of trouble working out how when the
>>> currentParallelism returned by the SplitEnumeratorContext [1] can change,
>>> and how a source should react to that.
>>>
>>> For context, I’m currently thinking about single partitions as “splits”,
>>> so a source would have an approximately constant number of splits which
>>> each has an potentially unbounded amount of work (at least in continuous
>>> mode). Each split will be assigned to some SourceReader by the split
>>> enumerator. If the value of currentParallelism changes, it seems like I’ll
>>> need to find a way to redistribute my partitions over SourceReaders, or
>>> else I'll end up with an unbalanced distribution of partitions to
>>> SourceReaders.
>>>
>>> I looked at the docs on elastic scaling [2], and it seems like when the
>>> parallelism of the source changes, the source will be checkpointed and
>>> restored. I think this would mean all the SourceReaders get restarted, and
>>> their splits are returned to the SplitEnumerator for reassignment. Is this
>>> approximately correct?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/
>>>
>>>

Reply via email to