Hi All! This is a very interesting discussion.
I think many users find it confusing what deployment mode to choose when considering a new production application on Kubernetes. With all the options of native, standalone and different operators this can get tricky :) I really like the idea that Thomas brought up to have at least a minimal operator implementation in Flink itself to cover the most common production job lifecycle management scenarios. I think the Flink community has a very strong experience in this area to create a successful implementation that would benefit most production users on Kubernetes. Cheers, Gyula On Mon, Jan 10, 2022 at 4:29 AM Yang Wang <danrtsey...@gmail.com> wrote: > Thanks all for this fruitful discussion. > > I think Xintong has given a strong point why we introduced the native K8s > integration, which is active resource management. > I have a concrete example for this in the production. When a K8s node is > down, the standalone K8s deployment will take longer > recovery time based on the K8s eviction time(IIRC, default is 5 minutes). > For the native K8s integration, Flink RM could be aware of the > TM heartbeat lost and allocate a new one timely. > > Also when introducing the native K8s integration, another hit is that we > should make the users are easy enough to migrate from YARN deployment. > They already have a production-ready job life-cycle management system, > which is using Flink CLI to submit the Flink jobs. > So we provide a consistent command "bin/flink run-application -t > kubernetes-application/yarn-application" to start a Flink application and > "bin/flink cancel/stop ..." > to terminate a Flink application. > > > Compared with K8s operator, I know that this is not a K8s > native mechanism. Hence, I also agree that we still need a powerful K8s > operator which > could work with both standalone and native K8s modes. The major difference > between them is how to start the JM and TM pods. For standalone, > they are managed by K8s job/deployment. For native, maybe we could simply > create a submission carrying the "flink run-application" arguments > which is derived from the Flink application CR. > > Make the Flink's active resource manager can talk to the K8s operator is > an interesting option, which could support both standalone and native. > Then Flink RM just needs to declare the resource requirement(e.g. 2 * <2G, > 1CPU>, 2 * <4G, 1CPU>) and defer the resource allocation/de-allocation > to the K8s operator. It feels like an intermediate form between native and > standalone mode :) > > > > Best, > Yang > > > > Xintong Song <tonysong...@gmail.com> 于2022年1月7日周五 12:02写道: > >> Hi folks, >> >> Thanks for the discussion. I'd like to share my two cents on this topic. >> >> Firstly, I'd like to clarify my understanding of the concepts "native k8s >> integration" and "active resource management". >> - Native k8s integration means Flink's master interacts with k8s' api >> server directly. It acts like embedding an operator inside Flink's master, >> which manages the resources (pod, deployment, configmap, etc.) and watches >> / reacts to related events. >> - Active resource management means Flink can actively start / terminate >> workers as needed. Its key characteristic is that the resource a Flink >> deployment uses is decided by the job's execution plan, unlike the opposite >> reactive mode (resource available to the deployment decides the execution >> plan) or the standalone mode (both execution plan and deployment resources >> are predefined). >> >> Currently, we have the yarn and native k8s deployments (and the recently >> removed mesos deployment) in active mode, due to their ability to request / >> release worker resources from the underlying cluster. And all the existing >> operators, AFAIK, work with a Flink standalone deployment, where Flink >> cannot request / release resources by itself. >> >> From this perspective, I think a large part of the native k8s integration >> advantages come from the active mode: being able to better understand the >> job's resource requirements and adjust the deployment resource accordingly. >> Both fine-grained resource management (customizing TM resources for >> different tasks / operators) and adaptive batch scheduler (rescale the >> deployment w.r.t. different stages) fall into this category. >> >> I'm wondering if we can have an operator that also works with the active >> mode. Instead of talking to the api server directly for adding / deleting >> resources, Flink's active resource manager can talk to the operator (via >> CR) about the resources the deployment needs, and let the operator to >> actually add / remove the resources. The operator should be able to work >> with (active) or without (standalone) the information of deployment's >> resource requirements. In this way, users are free to choose between active >> and reactive (e.g., HPA) rescaling, while always benefiting from the >> beyond-deployment lifecycle (upgrades, savepoint management, etc.) and >> alignment with the K8s ecosystem (Flink client free, operating via kubectl, >> etc.). >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Thu, Jan 6, 2022 at 1:06 PM Thomas Weise <t...@apache.org> wrote: >> >>> Hi David, >>> >>> Thank you for the reply and context! >>> >>> As for workload types and where native integration might fit: I think >>> that any k8s native solution that satisfies category 3) can also take >>> care of 1) and 2) while the native integration by itself can't achieve >>> that. Existence of [1] might serve as further indication. >>> >>> The k8s operator pattern would be an essential building block for a >>> k8s native solution that is interoperable with k8s ecosystem tooling >>> like kubectl, which is why [2] and subsequent derived art were >>> created. Specifically the CRD allows us to directly express the >>> concept of a Flink application consisting of job manager and task >>> manager pods along with associated create/update/delete operations. >>> >>> Would it make sense to gauge interest to have such an operator as part >>> of Flink? It appears so from discussions like [3]. I think such >>> addition would significantly lower the barrier to adoption, since like >>> you mentioned one cannot really run mission critical streaming >>> workloads with just the Apache Flink release binaries alone. While it >>> is great to have multiple k8s operators to choose from that are >>> managed outside Flink, it is unfortunately also evident that today's >>> hot operator turns into tomorrow's tech debt. I think such fate would >>> be less likely within the project, when multiple parties can join >>> forces and benefit from each other's contributions. There were similar >>> considerations and discussions around Docker images in the past. >>> >>> Out of the features that you listed it is particularly the application >>> upgrade that needs to be solved through an external process like >>> operator. The good thing is that many folks have already thought hard >>> about this and in existing implementations we see different strategies >>> that have their merit and production mileage (certainly applies to >>> [2]). We could combine the best of these ideas into a unified >>> implementation as part of Flink itself as starting point. >>> >>> Cheers, >>> Thomas >>> >>> >>> [1] https://github.com/wangyang0918/flink-native-k8s-operator >>> [2] https://github.com/lyft/flinkk8soperator >>> [3] https://lists.apache.org/thread/fhcr5gj1txcr0fo4s821jkp6d4tk6080 >>> >>> >>> On Tue, Jan 4, 2022 at 4:04 AM David Morávek <d...@apache.org> wrote: >>> > >>> > Hi Thomas, >>> > >>> > AFAIK there are no specific plans in this direction with the native >>> integration, but I'd like to share some thoughts on the topic >>> > >>> > In my understanding there are three major groups of workloads in Flink: >>> > >>> > 1) Batch workloads >>> > 2) Interactive workloads (Both Batch and Streaming; eg. SQL Gateway / >>> Zeppelin Notebooks / VVP ...) >>> > 3) "Mission Critical" Streaming workloads >>> > >>> > I think the native integration fits really well in the first two >>> categories. Let's talk about these first: >>> > >>> > 1) Batch workloads >>> > >>> > You don't really need to address the upgrade story here. The >>> interesting topic is how to "dynamically" adjust parallelism as the >>> workload can change between stages. This is where the Adaptive Batch >>> Scheduler [1] comes into play. To leverage the scheduler to the full >>> extend, it needs to be deployed with the remote shuffle service in place >>> [2], so the Flink's Resource Manager can free TaskManagers that are no >>> longer needed. >>> > >>> > This can IMO work really well with the native integration as there is >>> really clear approach on how the Resource Manager should decide on what >>> resources should be allocated. >>> > >>> > 2) Interactive workloads >>> > >>> > Again, the upgrade story is not really interesting in this scenario. >>> For batch workloads, it's basically the same as the above. For streaming >>> one this gets tricky. The main initiative that we current have in terms of >>> auto scaling / re-scaling of the streaming workloads is the reactive mode >>> (adaptive scheduler) [3]. >>> > >>> > I can totally see how the reactive mode could be integrated in the >>> native integration, but with the application mode, which is not really >>> suitable for the interactive workloads. For integration with session >>> cluster, we'd first need to address the "scheduling" problem of how to >>> distribute newly available resources between multiple jobs. >>> > >>> > What's pretty neat here is that the interpreter (zeppelin, sql gw, >>> ...) have a really convenient way of spinning up a new cluster inside k8s. >>> > >>> > 3) "Mission Critical" Streaming workloads >>> > >>> > This one is IMO the primary reason why one would consider building a >>> new operator these days as this needs a careful lifecycle management of the >>> pipeline. I assume this is also the use case that you're investigating, am >>> I correct? >>> > >>> > I'd second the requirements that you've already stated: >>> > a) Resource efficiency - being able to re-scale based on the workload, >>> in order to keep up with the input / not waste resources >>> > b) Fast recovery >>> > c) Application upgrades >>> > >>> > I personally don't think that the native integration is really >>> suitable here. The direction that we're headed is with the standalone >>> deployment on Kubernetes + the reactive mode (adaptive scheduler). >>> > >>> > In theory, if we want to build a really cloud (Kubernetes) native >>> stream processor, deploying the pipeline should be as simple as deploying >>> any other application. It should be also simple to integrate with CI & CD >>> environment and the fast / frequent deploy philosophy. >>> > >>> > Let's see where we stand and where we can expand from there: >>> > >>> > a) Resource efficiency >>> > >>> > We already have the reactive mode in place. This allows you to add / >>> remove task managers by adjusting the TM deployment (`kubectl scale ...`) >>> and Flink will automatically react to the available resources. This is >>> currently only supported with the Application Mode, that is limited to a >>> single job (which should be enough for this kind of workload). >>> > >>> > The re-scaling logic is left completely up to the user and can be as >>> simple as setting up a HPA (Horizontal Pod Autoscaler). I tend to think in >>> the direction, that we might want to provide a custom k8s metrics server, >>> that allows HPA to query the metrics from JM, to make this more flexible >>> and easy to use. >>> > >>> > As this looks really great in theory, there are still some >>> shortcomings that we're actively working on addressing. For this feature to >>> be really widely adopted, we need to make the re-scaling experience as fast >>> as possible, so we can re-scale often to react to the input rate. This >>> could be currently a problem with large RocksDB states as this involves >>> full re-balance of the state (splitting / merging RocksDB instances). The >>> k8s operator approach has the same / even worse limitation as it involves >>> taking a savepoint a re-building the state from it. >>> > >>> > b) Fast recovery >>> > >>> > This is IMO not as different from the native mode (although I'd have >>> to check whether RM failover can reuse task managers). This involves >>> frequent and fast checkpointing, local recovery (which is still not >>> supported in reactive mode, but this will be hopefully addressed soon) and >>> working directory efforts [4]. >>> > >>> > c) Application upgrades >>> > >>> > This is the topic I'm still struggling with a little. Historically >>> this involves external lifecycle management (savepoint + submitting a new >>> job). I think at the end of the day, with application mode on standalone >>> k8s, it could be as simple as updating the docker image of the JM >>> deployment. >>> > >>> > If I think about the simplest upgrade scenario, simple in-place >>> restore from the latest checkpoint, it may be fairly simple to implement. >>> What I'm struggling with are the more complex upgrade scenarios such as >>> dual, blue / green deployment. >>> > >>> > >>> > To sum this up, I'd really love if Flink could provide great out-of >>> the box experience with standalone mode on k8s, that makes the experience >>> as close to running / operating any other application as possible. >>> > >>> > I'd really appreciate to hear your thoughts on this topic. >>> > >>> > [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler >>> > [2] https://github.com/flink-extended/flink-remote-shuffle >>> > [3] >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/ >>> > [4] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-198%3A+Working+directory+for+Flink+processes >>> > >>> > Best, >>> > D. >>> > >>> > On Tue, Jan 4, 2022 at 12:44 AM Thomas Weise <t...@apache.org> wrote: >>> >> >>> >> Hi, >>> >> >>> >> I was recently looking at the Flink native Kubernetes integration [1] >>> >> to get an idea how it relates to existing operator based solutions >>> >> [2], [3]. >>> >> >>> >> Part of the native integration's motivations was simplicity (no extra >>> >> component to install), but arguably that is also a shortcoming. The >>> >> k8s operator model can offer support for application lifecycle like >>> >> upgrade and rescaling, as well as job submission without a Flink >>> >> client. >>> >> >>> >> When using the Flink native integration it would still be necessary to >>> >> provide that controller functionality. Is the idea to use the native >>> >> integration for task manager resource allocation in tandem with an >>> >> operator that provides the external controller functionality? If >>> >> anyone using the Flink native integration can share experience, I >>> >> would be curious to learn more about the specific setup and if there >>> >> are plans to expand the k8s native integration capabilities. >>> >> >>> >> For example: >>> >> >>> >> * Application upgrade with features such as [4]. Since the job manager >>> >> is part of the deployment it cannot orchestrate the deployment. It >>> >> needs to be the responsibility of an external process. Has anyone >>> >> contemplated adding such a component to Flink itself? >>> >> >>> >> * Rescaling: Theoretically a parallelism change could be performed w/o >>> >> restart of the job manager pod. Hence, building blocks to trigger and >>> >> apply rescaling could be part of Flink itself. Has this been explored >>> >> further? >>> >> >>> >> Yang kindly pointed me to [5]. Is the recommendation/conclusion that >>> >> when a k8s operator is already used, then let it be in charge of the >>> >> task manager resource allocation? If so, what scenario was the native >>> >> k8s integration originally intended for? >>> >> >>> >> Thanks, >>> >> Thomas >>> >> >>> >> [1] >>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#deployment-modes >>> >> [2] https://github.com/lyft/flinkk8soperator >>> >> [3] https://github.com/spotify/flink-on-k8s-operator >>> >> [4] >>> https://github.com/lyft/flinkk8soperator/blob/master/docs/state_machine.md >>> >> [5] https://lists.apache.org/thread/8cn99f6n8nhr07n5vqfo880tpm624s5d >>> >>