Hi All!

This is a very interesting discussion.

I think many users find it confusing what deployment mode to choose when
considering a new production application on Kubernetes. With all the
options of native, standalone and different operators this can get tricky :)

I really like the idea that Thomas brought up to have at least a minimal
operator implementation in Flink itself to cover the most common production
job lifecycle management scenarios. I think the Flink community has a very
strong experience in this area to create a successful implementation that
would benefit most production users on Kubernetes.

Cheers,
Gyula

On Mon, Jan 10, 2022 at 4:29 AM Yang Wang <danrtsey...@gmail.com> wrote:

> Thanks all for this fruitful discussion.
>
> I think Xintong has given a strong point why we introduced the native K8s
> integration, which is active resource management.
> I have a concrete example for this in the production. When a K8s node is
> down, the standalone K8s deployment will take longer
> recovery time based on the K8s eviction time(IIRC, default is 5 minutes).
> For the native K8s integration, Flink RM could be aware of the
> TM heartbeat lost and allocate a new one timely.
>
> Also when introducing the native K8s integration, another hit is that we
> should make the users are easy enough to migrate from YARN deployment.
> They already have a production-ready job life-cycle management system,
> which is using Flink CLI to submit the Flink jobs.
> So we provide a consistent command "bin/flink run-application -t
> kubernetes-application/yarn-application" to start a Flink application and
> "bin/flink cancel/stop ..."
> to terminate a Flink application.
>
>
> Compared with K8s operator, I know that this is not a K8s
> native mechanism. Hence, I also agree that we still need a powerful K8s
> operator which
> could work with both standalone and native K8s modes. The major difference
> between them is how to start the JM and TM pods. For standalone,
> they are managed by K8s job/deployment. For native, maybe we could simply
> create a submission carrying the "flink run-application" arguments
> which is derived from the Flink application CR.
>
> Make the Flink's active resource manager can talk to the K8s operator is
> an interesting option, which could support both standalone and native.
> Then Flink RM just needs to declare the resource requirement(e.g. 2 * <2G,
> 1CPU>, 2 * <4G, 1CPU>) and defer the resource allocation/de-allocation
> to the K8s operator. It feels like an intermediate form between native and
> standalone mode :)
>
>
>
> Best,
> Yang
>
>
>
> Xintong Song <tonysong...@gmail.com> 于2022年1月7日周五 12:02写道:
>
>> Hi folks,
>>
>> Thanks for the discussion. I'd like to share my two cents on this topic.
>>
>> Firstly, I'd like to clarify my understanding of the concepts "native k8s
>> integration" and "active resource management".
>> - Native k8s integration means Flink's master interacts with k8s' api
>> server directly. It acts like embedding an operator inside Flink's master,
>> which manages the resources (pod, deployment, configmap, etc.) and watches
>> / reacts to related events.
>> - Active resource management means Flink can actively start / terminate
>> workers as needed. Its key characteristic is that the resource a Flink
>> deployment uses is decided by the job's execution plan, unlike the opposite
>> reactive mode (resource available to the deployment decides the execution
>> plan) or the standalone mode (both execution plan and deployment resources
>> are predefined).
>>
>> Currently, we have the yarn and native k8s deployments (and the recently
>> removed mesos deployment) in active mode, due to their ability to request /
>> release worker resources from the underlying cluster. And all the existing
>> operators, AFAIK, work with a Flink standalone deployment, where Flink
>> cannot request / release resources by itself.
>>
>> From this perspective, I think a large part of the native k8s integration
>> advantages come from the active mode: being able to better understand the
>> job's resource requirements and adjust the deployment resource accordingly.
>> Both fine-grained resource management (customizing TM resources for
>> different tasks / operators) and adaptive batch scheduler (rescale the
>> deployment w.r.t. different stages) fall into this category.
>>
>> I'm wondering if we can have an operator that also works with the active
>> mode. Instead of talking to the api server directly for adding / deleting
>> resources, Flink's active resource manager can talk to the operator (via
>> CR) about the resources the deployment needs, and let the operator to
>> actually add / remove the resources. The operator should be able to work
>> with (active) or without (standalone) the information of deployment's
>> resource requirements. In this way, users are free to choose between active
>> and reactive (e.g., HPA) rescaling, while always benefiting from the
>> beyond-deployment lifecycle (upgrades, savepoint management, etc.) and
>> alignment with the K8s ecosystem (Flink client free, operating via kubectl,
>> etc.).
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Jan 6, 2022 at 1:06 PM Thomas Weise <t...@apache.org> wrote:
>>
>>> Hi David,
>>>
>>> Thank you for the reply and context!
>>>
>>> As for workload types and where native integration might fit: I think
>>> that any k8s native solution that satisfies category 3) can also take
>>> care of 1) and 2) while the native integration by itself can't achieve
>>> that. Existence of [1] might serve as further indication.
>>>
>>> The k8s operator pattern would be an essential building block for a
>>> k8s native solution that is interoperable with k8s ecosystem tooling
>>> like kubectl, which is why [2] and subsequent derived art were
>>> created. Specifically the CRD allows us to directly express the
>>> concept of a Flink application consisting of job manager and task
>>> manager pods along with associated create/update/delete operations.
>>>
>>> Would it make sense to gauge interest to have such an operator as part
>>> of Flink? It appears so from discussions like [3]. I think such
>>> addition would significantly lower the barrier to adoption, since like
>>> you mentioned one cannot really run mission critical streaming
>>> workloads with just the Apache Flink release binaries alone. While it
>>> is great to have multiple k8s operators to choose from that are
>>> managed outside Flink, it is unfortunately also evident that today's
>>> hot operator turns into tomorrow's tech debt. I think such fate would
>>> be less likely within the project, when multiple parties can join
>>> forces and benefit from each other's contributions. There were similar
>>> considerations and discussions around Docker images in the past.
>>>
>>> Out of the features that you listed it is particularly the application
>>> upgrade that needs to be solved through an external process like
>>> operator. The good thing is that many folks have already thought hard
>>> about this and in existing implementations we see different strategies
>>> that have their merit and production mileage (certainly applies to
>>> [2]). We could combine the best of these ideas into a unified
>>> implementation as part of Flink itself as starting point.
>>>
>>> Cheers,
>>> Thomas
>>>
>>>
>>> [1] https://github.com/wangyang0918/flink-native-k8s-operator
>>> [2] https://github.com/lyft/flinkk8soperator
>>> [3] https://lists.apache.org/thread/fhcr5gj1txcr0fo4s821jkp6d4tk6080
>>>
>>>
>>> On Tue, Jan 4, 2022 at 4:04 AM David Morávek <d...@apache.org> wrote:
>>> >
>>> > Hi Thomas,
>>> >
>>> > AFAIK there are no specific plans in this direction with the native
>>> integration, but I'd like to share some thoughts on the topic
>>> >
>>> > In my understanding there are three major groups of workloads in Flink:
>>> >
>>> > 1) Batch workloads
>>> > 2) Interactive workloads (Both Batch and Streaming; eg. SQL Gateway /
>>> Zeppelin Notebooks / VVP ...)
>>> > 3) "Mission Critical" Streaming workloads
>>> >
>>> > I think the native integration fits really well in the first two
>>> categories. Let's talk about these first:
>>> >
>>> > 1) Batch workloads
>>> >
>>> > You don't really need to address the upgrade story here. The
>>> interesting topic is how to "dynamically" adjust parallelism as the
>>> workload can change between stages. This is where the Adaptive Batch
>>> Scheduler [1] comes into play. To leverage the scheduler to the full
>>> extend, it needs to be deployed with the remote shuffle service in place
>>> [2], so the Flink's Resource Manager can free TaskManagers that are no
>>> longer needed.
>>> >
>>> > This can IMO work really well with the native integration as there is
>>> really clear approach on how the Resource Manager should decide on what
>>> resources should be allocated.
>>> >
>>> > 2) Interactive workloads
>>> >
>>> > Again, the upgrade story is not really interesting in this scenario.
>>> For batch workloads, it's basically the same as the above. For streaming
>>> one this gets tricky. The main initiative that we current have in terms of
>>> auto scaling / re-scaling of the streaming workloads is the reactive mode
>>> (adaptive scheduler) [3].
>>> >
>>> > I can totally see how the reactive mode could be integrated in the
>>> native integration, but with the application mode, which is not really
>>> suitable for the interactive workloads. For integration with session
>>> cluster, we'd first need to address the "scheduling" problem of how to
>>> distribute newly available resources between multiple jobs.
>>> >
>>> > What's pretty neat here is that the interpreter (zeppelin, sql gw,
>>> ...) have a really convenient way of spinning up a new cluster inside k8s.
>>> >
>>> > 3) "Mission Critical" Streaming workloads
>>> >
>>> > This one is IMO the primary reason why one would consider building a
>>> new operator these days as this needs a careful lifecycle management of the
>>> pipeline. I assume this is also the use case that you're investigating, am
>>> I correct?
>>> >
>>> > I'd second the requirements that you've already stated:
>>> > a) Resource efficiency - being able to re-scale based on the workload,
>>> in order to keep up with the input / not waste resources
>>> > b) Fast recovery
>>> > c) Application upgrades
>>> >
>>> > I personally don't think that the native integration is really
>>> suitable here. The direction that we're headed is with the standalone
>>> deployment on Kubernetes + the reactive mode (adaptive scheduler).
>>> >
>>> > In theory, if we want to build a really cloud (Kubernetes) native
>>> stream processor, deploying the pipeline should be as simple as deploying
>>> any other application. It should be also simple to integrate with CI & CD
>>> environment and the fast / frequent deploy philosophy.
>>> >
>>> > Let's see where we stand and where we can expand from there:
>>> >
>>> > a) Resource efficiency
>>> >
>>> > We already have the reactive mode in place. This allows you to add /
>>> remove task managers by adjusting the TM deployment (`kubectl scale ...`)
>>> and Flink will automatically react to the available resources. This is
>>> currently only supported with the Application Mode, that is limited to a
>>> single job (which should be enough for this kind of workload).
>>> >
>>> > The re-scaling logic is left completely up to the user and can be as
>>> simple as setting up a HPA (Horizontal Pod Autoscaler). I tend to think in
>>> the direction, that we might want to provide a custom k8s metrics server,
>>> that allows HPA to query the metrics from JM, to make this more flexible
>>> and easy to use.
>>> >
>>> > As this looks really great in theory, there are still some
>>> shortcomings that we're actively working on addressing. For this feature to
>>> be really widely adopted, we need to make the re-scaling experience as fast
>>> as possible, so we can re-scale often to react to the input rate. This
>>> could be currently a problem with large RocksDB states as this involves
>>> full re-balance of the state (splitting / merging RocksDB instances). The
>>> k8s operator approach has the same / even worse limitation as it involves
>>> taking a savepoint a re-building the state from it.
>>> >
>>> > b) Fast recovery
>>> >
>>> > This is IMO not as different from the native mode (although I'd have
>>> to check whether RM failover can reuse task managers). This involves
>>> frequent and fast checkpointing, local recovery (which is still not
>>> supported in reactive mode, but this will be hopefully addressed soon) and
>>> working directory efforts [4].
>>> >
>>> > c) Application upgrades
>>> >
>>> > This is the topic I'm still struggling with a little. Historically
>>> this involves external lifecycle management (savepoint + submitting a new
>>> job). I think at the end of the day, with application mode on standalone
>>> k8s, it could be as simple as updating the docker image of the JM
>>> deployment.
>>> >
>>> > If I think about the simplest upgrade scenario, simple in-place
>>> restore from the latest checkpoint, it may be fairly simple to implement.
>>> What I'm struggling with are the more complex upgrade scenarios such as
>>> dual, blue / green deployment.
>>> >
>>> >
>>> > To sum this up, I'd really love if Flink could provide great out-of
>>> the box experience with standalone mode on k8s, that makes the experience
>>> as close to running / operating any other application as possible.
>>> >
>>> > I'd really appreciate to hear your thoughts on this topic.
>>> >
>>> > [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
>>> > [2] https://github.com/flink-extended/flink-remote-shuffle
>>> > [3]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/
>>> > [4]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-198%3A+Working+directory+for+Flink+processes
>>> >
>>> > Best,
>>> > D.
>>> >
>>> > On Tue, Jan 4, 2022 at 12:44 AM Thomas Weise <t...@apache.org> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> I was recently looking at the Flink native Kubernetes integration [1]
>>> >> to get an idea how it relates to existing operator based solutions
>>> >> [2], [3].
>>> >>
>>> >> Part of the native integration's motivations was simplicity (no extra
>>> >> component to install), but arguably that is also a shortcoming. The
>>> >> k8s operator model can offer support for application lifecycle like
>>> >> upgrade and rescaling, as well as job submission without a Flink
>>> >> client.
>>> >>
>>> >> When using the Flink native integration it would still be necessary to
>>> >> provide that controller functionality. Is the idea to use the native
>>> >> integration for task manager resource allocation in tandem with an
>>> >> operator that provides the external controller functionality? If
>>> >> anyone using the Flink native integration can share experience, I
>>> >> would be curious to learn more about the specific setup and if there
>>> >> are plans to expand the k8s native integration capabilities.
>>> >>
>>> >> For example:
>>> >>
>>> >> * Application upgrade with features such as [4]. Since the job manager
>>> >> is part of the deployment it cannot orchestrate the deployment. It
>>> >> needs to be the responsibility of an external process. Has anyone
>>> >> contemplated adding such a component to Flink itself?
>>> >>
>>> >> * Rescaling: Theoretically a parallelism change could be performed w/o
>>> >> restart of the job manager pod. Hence, building blocks to trigger and
>>> >> apply rescaling could be part of Flink itself. Has this been explored
>>> >> further?
>>> >>
>>> >> Yang kindly pointed me to [5]. Is the recommendation/conclusion that
>>> >> when a k8s operator is already used, then let it be in charge of the
>>> >> task manager resource allocation? If so, what scenario was the native
>>> >> k8s integration originally intended for?
>>> >>
>>> >> Thanks,
>>> >> Thomas
>>> >>
>>> >> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#deployment-modes
>>> >> [2] https://github.com/lyft/flinkk8soperator
>>> >> [3] https://github.com/spotify/flink-on-k8s-operator
>>> >> [4]
>>> https://github.com/lyft/flinkk8soperator/blob/master/docs/state_machine.md
>>> >> [5] https://lists.apache.org/thread/8cn99f6n8nhr07n5vqfo880tpm624s5d
>>>
>>

Reply via email to