Re: Print on screen DataStream content
Please correct me if I am wrong. `DataStream#print()` only prints to the screen when running from the IDE, but does not work (print to the screen) when running on a cluster (even a local cluster). Thanks, Pankaj On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hey Simone, > > I'd suggest trying out the `DataStream#print()` function to start, but > there are a few other easy-to-integrate sinks for testing that you can > check out in the docs here[1] > > Best, > Austin > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks > > On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin > wrote: > >> Hi All, >> >> On my code I have a DataStream that I would like to access. I need to >> understand what I'm getting for each transformation to check if the data >> that I'm working on make sense. How can I print into the console or get a >> file (csv, txt) for the variables: "stream", "enriched" and "result"? >> >> I have tried different way but no way to get the data. >> >> Thanks! >> >> >> *FlinkKafkaConsumer kafkaData =* >> *new FlinkKafkaConsumer("CorID_1", new >> EventDeserializationSchema(), p);* >> *WatermarkStrategy wmStrategy =* >> *WatermarkStrategy* >> *.forMonotonousTimestamps()* >> *.withIdleness(Duration.ofMinutes(1))* >> *.withTimestampAssigner((event, timestamp) -> {* >> *return event.get_Time();* >> *});* >> *DataStream stream = env.addSource(* >> *kafkaData.assignTimestampsAndWatermarks(wmStrategy));* >> >> *DataStream> enriched = stream* >> *.keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)* >> *.map(new StatefulSessionCalculator());* >> >> *WindowedStream, String, TimeWindow> result = >> enriched* >> *.keyBy(new MyKeySelector())* >> *.window(EventTimeSessionWindows.withDynamicGap(new >> DynamicSessionWindows()));* >> >
Re: Processing single events for minimum latency
Thank you for the quick and informative reply, Piotrek! On Thu, Oct 15, 2020 at 2:09 AM Piotr Nowojski wrote: > Hi Pankay, > > Yes, you can trigger a window per each element, take a look at the Window > Triggers [1]. > > Flink is always processing all records immediately. The only things that > can delay processing elements are: > - buffering elements on the operator's state (vide WindowOperator) > - buffer-timeout (but that's on the output, so it's not delaying > processing per se) > - back pressure > - exactly-once checkpointing (especially under the back pressure) > > > Also, is there any way I can change the execution.buffer-timeout or > setbuffertimeout(milliseconds) dynamically while the job is running? > > No, sorry it's not possible :( > > Best, > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers > > czw., 15 paź 2020 o 01:55 Pankaj Chand > napisał(a): > >> Hi Piotrek, >> >> Thank you for replying! I want to process each record as soon as it is >> ingested (or reaches an operator) without waiting for a window for records >> to arrive. However, by not using windows, I am not sure if each record gets >> emitted immediately upon processing. >> >> > You still can use windowing, but you might want to emit updated value >> of the window per every processed record. >> >> How do I do this? >> >> Also, is there any way I can change the execution.buffer-timeout or >> setbuffertimeout(milliseconds) dynamically while the job is running? >> >> Thank you, >> >> Pankaj >> >> On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski >> wrote: >> >>> Hi Pankaj, >>> >>> I'm not entirely sure if I understand your question. >>> >>> If you want to minimize latency, you should avoid using windows or any >>> other operators, that are buffering data for long periods of time. You >>> still can use windowing, but you might want to emit updated value of the >>> window per every processed record. >>> >>> Other than that, you might also want to reduce >>> `execution.buffer-timeout` from the default value of 100ms down to 1ms, or >>> 0ms [1]. Is this what you are looking for? >>> >>> Piotrek >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html >>> >>> śr., 14 paź 2020 o 12:38 Pankaj Chand >>> napisał(a): >>> >>>> Hi all, >>>> >>>> What is the recommended way to make a Flink job that processes each >>>> event individually as soon as it comes and without waiting for a >>>> window, in order to minimize latency in the entire DAG of operators? >>>> >>>> For example, here is some sample WordCount code (without windws), >>>> followed by some known ways: >>>> >>>> DataStream wordCounts = text >>>> .flatMap(new FlatMapFunction()) >>>> .keyBy("word") >>>> .reduce(new ReduceFunction()); >>>> >>>> >>>>1. Don't include any TimeWindow/CountWindow function (does this >>>>actually achieve what I want?) >>>>2. Use a CountWindow with a count of 1 >>>>3. Make a Trigger that fires to process each event when it comes in >>>> >>>> I think the above methods only work at the processing level and latency >>>> with respect to a single operator, but does not affect the latency of an >>>> event in the entire Flink job's DAG since those ways do not affect the >>>> buffertimeout value. >>>> >>>> Thanks, >>>> >>>> Pankaj >>>> >>>
Re: Processing single events for minimum latency
Hi Piotrek, Thank you for replying! I want to process each record as soon as it is ingested (or reaches an operator) without waiting for a window for records to arrive. However, by not using windows, I am not sure if each record gets emitted immediately upon processing. > You still can use windowing, but you might want to emit updated value of the window per every processed record. How do I do this? Also, is there any way I can change the execution.buffer-timeout or setbuffertimeout(milliseconds) dynamically while the job is running? Thank you, Pankaj On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski wrote: > Hi Pankaj, > > I'm not entirely sure if I understand your question. > > If you want to minimize latency, you should avoid using windows or any > other operators, that are buffering data for long periods of time. You > still can use windowing, but you might want to emit updated value of the > window per every processed record. > > Other than that, you might also want to reduce `execution.buffer-timeout` > from the default value of 100ms down to 1ms, or 0ms [1]. Is this what you > are looking for? > > Piotrek > > [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html > > śr., 14 paź 2020 o 12:38 Pankaj Chand > napisał(a): > >> Hi all, >> >> What is the recommended way to make a Flink job that processes each event >> individually as soon as it comes and without waiting for a window, in order >> to minimize latency in the entire DAG of operators? >> >> For example, here is some sample WordCount code (without windws), >> followed by some known ways: >> >> DataStream wordCounts = text >> .flatMap(new FlatMapFunction()) >> .keyBy("word") >> .reduce(new ReduceFunction()); >> >> >>1. Don't include any TimeWindow/CountWindow function (does this >>actually achieve what I want?) >>2. Use a CountWindow with a count of 1 >>3. Make a Trigger that fires to process each event when it comes in >> >> I think the above methods only work at the processing level and latency >> with respect to a single operator, but does not affect the latency of an >> event in the entire Flink job's DAG since those ways do not affect the >> buffertimeout value. >> >> Thanks, >> >> Pankaj >> >
Processing single events for minimum latency
Hi all, What is the recommended way to make a Flink job that processes each event individually as soon as it comes and without waiting for a window, in order to minimize latency in the entire DAG of operators? For example, here is some sample WordCount code (without windws), followed by some known ways: DataStream wordCounts = text .flatMap(new FlatMapFunction()) .keyBy("word") .reduce(new ReduceFunction()); 1. Don't include any TimeWindow/CountWindow function (does this actually achieve what I want?) 2. Use a CountWindow with a count of 1 3. Make a Trigger that fires to process each event when it comes in I think the above methods only work at the processing level and latency with respect to a single operator, but does not affect the latency of an event in the entire Flink job's DAG since those ways do not affect the buffertimeout value. Thanks, Pankaj
Re: How to get Latency Tracking results?
Actually, I was wrong. It turns out I was setting the values the wrong way in conf/flink-conf.yaml. I set "metrics.latency.interval 100" instead of "metrics.latency.interval: 100". Sorry about that. T On Thu, Sep 10, 2020 at 7:05 AM Pankaj Chand wrote: > Thank you, David! > > After setting ExecutionConfig for latency tracking in the > SocketWindowWordCount Java source code and rebuilding and using that > application jar file, I am now getting the latency tracking metrics using > the REST endpoint. The below documentation [1] seems to imply that > merely setting the parameters in flink-conf.yaml would generate the latency > tracking metrics, which is not happening in my case. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#latency-tracking > > On Wed, Sep 9, 2020 at 10:08 AM David Anderson > wrote: > >> Pankaj, >> >> I just checked, and the latency metrics for SocketWindowWordCount show up >> just fine for me with Flink 1.11.1. For me, the latency metrics existed >> even before I provided any data on the socket for the job to process. This >> makes sense, as the latency tracking markers will propagate through the job >> graph and provide measurements despite the lack of any real data for the >> job to process. >> >> These metrics have horrible names, but for me, >> >> curl -s >> http://localhost:8081/jobs/69f954cb93c680fb2b210f877c377131/metrics >> >> reveals a bunch of latency metrics, and, for example, >> >> curl -s >> http://localhost:8081/jobs/69f954cb93c680fb2b210f877c377131/metrics?get=latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.17fbfcaabad45985bbdf4da0490487e3.operator_subtask_index.0.latency_p999 >> >> returns >> >> >> [{"id":"latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.17fbfcaabad45985bbdf4da0490487e3.operator_subtask_index.0.latency_p999","value":"105.0"}] >> >> Best, >> David >> >> >> On Wed, Sep 9, 2020 at 3:16 PM Pankaj Chand >> wrote: >> >>> Hi David, >>> >>> Thanks for replying! Sorry, I forgot to mention I am using >>> Flink Version: 1.11.1, Commit ID: 7eb514a. >>> Is it possible that the default SocketWindowWordCount job is too simple >>> to generate Latency metrics? Or that the latency metrics disappear from the >>> output JSON when the data ingestion is zero? >>> >>> Thanks, >>> >>> Pankaj >>> >>> >>> On Wed, Sep 9, 2020 at 6:27 AM David Anderson >>> wrote: >>> >>>> Pankaj, >>>> >>>> The Flink web UI doesn't do any visualizations of histogram metrics, so >>>> the only way to access the latency metrics is either through the REST api >>>> or a metrics reporter. >>>> >>>> The REST endpoint you tried is the correct place to find these metrics >>>> in all recent versions of Flink, but somewhere back before Flink 1.5 or 1.6 >>>> (if I recall correctly) these metrics were task metrics. So if you are >>>> using an older version of Flink you'll need to dig deeper. I believe you'll >>>> find them in >>>> >>>> /jobs//vertices//subtasks/metrics >>>> >>>> Regards, >>>> David >>>> >>>> >>>> >>>> On Tue, Sep 8, 2020 at 10:52 PM Pankaj Chand >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> How do I visualize (or extract) the results for Latency Tracking for a >>>>> Flink local cluster? I set "metrics.latency.interval 100" in the >>>>> conf/flink-conf.yaml file, and started the cluster and >>>>> SocketWindowWordCount job. However, I could not find the latency >>>>> distributions anywhere in the web UI, nor are there any latency metrics in >>>>> the Metrics dropdown box for either task. >>>>> >>>>> I also set "metrics.latency.granularity "operator"" in >>>>> conf/flink-conf.yaml, but that did not help. >>>>> >>>>> When I tried to query the REST endpoint, I got the following output >>>>> which did not seem to contain anything related to latency: >>>>> >>>>> $ curl -s >>>>> http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics >>>>> >>>>> >>>>> [{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}] >>>>> >>>>> Thanks, >>>>> >>>>> Pankaj >>>>> >>>>
Re: How to get Latency Tracking results?
Thank you, David! After setting ExecutionConfig for latency tracking in the SocketWindowWordCount Java source code and rebuilding and using that application jar file, I am now getting the latency tracking metrics using the REST endpoint. The below documentation [1] seems to imply that merely setting the parameters in flink-conf.yaml would generate the latency tracking metrics, which is not happening in my case. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#latency-tracking On Wed, Sep 9, 2020 at 10:08 AM David Anderson wrote: > Pankaj, > > I just checked, and the latency metrics for SocketWindowWordCount show up > just fine for me with Flink 1.11.1. For me, the latency metrics existed > even before I provided any data on the socket for the job to process. This > makes sense, as the latency tracking markers will propagate through the job > graph and provide measurements despite the lack of any real data for the > job to process. > > These metrics have horrible names, but for me, > > curl -s > http://localhost:8081/jobs/69f954cb93c680fb2b210f877c377131/metrics > > reveals a bunch of latency metrics, and, for example, > > curl -s > http://localhost:8081/jobs/69f954cb93c680fb2b210f877c377131/metrics?get=latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.17fbfcaabad45985bbdf4da0490487e3.operator_subtask_index.0.latency_p999 > > returns > > > [{"id":"latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.17fbfcaabad45985bbdf4da0490487e3.operator_subtask_index.0.latency_p999","value":"105.0"}] > > Best, > David > > > On Wed, Sep 9, 2020 at 3:16 PM Pankaj Chand > wrote: > >> Hi David, >> >> Thanks for replying! Sorry, I forgot to mention I am using Flink Version: >> 1.11.1, Commit ID: 7eb514a. >> Is it possible that the default SocketWindowWordCount job is too simple >> to generate Latency metrics? Or that the latency metrics disappear from the >> output JSON when the data ingestion is zero? >> >> Thanks, >> >> Pankaj >> >> >> On Wed, Sep 9, 2020 at 6:27 AM David Anderson >> wrote: >> >>> Pankaj, >>> >>> The Flink web UI doesn't do any visualizations of histogram metrics, so >>> the only way to access the latency metrics is either through the REST api >>> or a metrics reporter. >>> >>> The REST endpoint you tried is the correct place to find these metrics >>> in all recent versions of Flink, but somewhere back before Flink 1.5 or 1.6 >>> (if I recall correctly) these metrics were task metrics. So if you are >>> using an older version of Flink you'll need to dig deeper. I believe you'll >>> find them in >>> >>> /jobs//vertices//subtasks/metrics >>> >>> Regards, >>> David >>> >>> >>> >>> On Tue, Sep 8, 2020 at 10:52 PM Pankaj Chand >>> wrote: >>> >>>> Hello, >>>> >>>> How do I visualize (or extract) the results for Latency Tracking for a >>>> Flink local cluster? I set "metrics.latency.interval 100" in the >>>> conf/flink-conf.yaml file, and started the cluster and >>>> SocketWindowWordCount job. However, I could not find the latency >>>> distributions anywhere in the web UI, nor are there any latency metrics in >>>> the Metrics dropdown box for either task. >>>> >>>> I also set "metrics.latency.granularity "operator"" in >>>> conf/flink-conf.yaml, but that did not help. >>>> >>>> When I tried to query the REST endpoint, I got the following output >>>> which did not seem to contain anything related to latency: >>>> >>>> $ curl -s >>>> http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics >>>> >>>> >>>> [{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}] >>>> >>>> Thanks, >>>> >>>> Pankaj >>>> >>>
Re: How to get Latency Tracking results?
Hi David, Thanks for replying! Sorry, I forgot to mention I am using Flink Version: 1.11.1, Commit ID: 7eb514a. Is it possible that the default SocketWindowWordCount job is too simple to generate Latency metrics? Or that the latency metrics disappear from the output JSON when the data ingestion is zero? Thanks, Pankaj On Wed, Sep 9, 2020 at 6:27 AM David Anderson wrote: > Pankaj, > > The Flink web UI doesn't do any visualizations of histogram metrics, so > the only way to access the latency metrics is either through the REST api > or a metrics reporter. > > The REST endpoint you tried is the correct place to find these metrics in > all recent versions of Flink, but somewhere back before Flink 1.5 or 1.6 > (if I recall correctly) these metrics were task metrics. So if you are > using an older version of Flink you'll need to dig deeper. I believe you'll > find them in > > /jobs//vertices//subtasks/metrics > > Regards, > David > > > > On Tue, Sep 8, 2020 at 10:52 PM Pankaj Chand > wrote: > >> Hello, >> >> How do I visualize (or extract) the results for Latency Tracking for a >> Flink local cluster? I set "metrics.latency.interval 100" in the >> conf/flink-conf.yaml file, and started the cluster and >> SocketWindowWordCount job. However, I could not find the latency >> distributions anywhere in the web UI, nor are there any latency metrics in >> the Metrics dropdown box for either task. >> >> I also set "metrics.latency.granularity "operator"" in >> conf/flink-conf.yaml, but that did not help. >> >> When I tried to query the REST endpoint, I got the following output which >> did not seem to contain anything related to latency: >> >> $ curl -s >> http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics >> >> >> [{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}] >> >> Thanks, >> >> Pankaj >> >
How to get Latency Tracking results?
Hello, How do I visualize (or extract) the results for Latency Tracking for a Flink local cluster? I set "metrics.latency.interval 100" in the conf/flink-conf.yaml file, and started the cluster and SocketWindowWordCount job. However, I could not find the latency distributions anywhere in the web UI, nor are there any latency metrics in the Metrics dropdown box for either task. I also set "metrics.latency.granularity "operator"" in conf/flink-conf.yaml, but that did not help. When I tried to query the REST endpoint, I got the following output which did not seem to contain anything related to latency: $ curl -s http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics [{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}] Thanks, Pankaj
Re: Implementation of setBufferTimeout(timeoutMillis)
Thank you so much, Yun! It is exactly what I needed. On Mon, Aug 31, 2020 at 1:50 AM Yun Gao wrote: > Hi Pankaj, > > I think it should be in > org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher. > > Best, > Yun > > > > -- > Sender:Pankaj Chand > Date:2020/08/31 02:40:15 > Recipient:user > Theme:Implementation of setBufferTimeout(timeoutMillis) > > Hello, > > The documentation gives the following two sample lines for setting the > buffer timeout for the streaming environment or transformation. > > > > *env.setBufferTimeout(timeoutMillis);env.generateSequence(1,10).map(new > MyMapper()).setBufferTimeout(timeoutMillis);* > > I have been trying to find where (file and method) in the Flink source > code are the buffers being flushed by iteratively referring to the value of > timeoutMillis (or the default value), but have been unsuccessful. Please > help. > > Thanks, > > Pankaj > >
Implementation of setBufferTimeout(timeoutMillis)
Hello, The documentation gives the following two sample lines for setting the buffer timeout for the streaming environment or transformation. *env.setBufferTimeout(timeoutMillis);env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);* I have been trying to find where (file and method) in the Flink source code are the buffers being flushed by iteratively referring to the value of timeoutMillis (or the default value), but have been unsuccessful. Please help. Thanks, Pankaj
Re: OpenJDK or Oracle JDK: 8 or 11?
Thank you, Arvid and Marco! On Sat, Aug 22, 2020 at 2:03 PM Marco Villalobos wrote: > Hi Pankaj, > > I highly recommend that you use an OpenJDK version 11 because each JDK > upgrade has a performance improvement, and also because the Oracle JDK and > OpenJDK are based off the same code-base. The main difference between > Oracle and OpenJDK is the branding and price. > > > > On Aug 22, 2020, at 4:23 AM, Pankaj Chand > wrote: > > > > Hello, > > > > The documentation says that to run Flink, we need Java 8 or 11. > > > > Will JDK 11 work for running Flink, programming Flink applications as > well as building Flink from source? > > > > Also, can we use Open JDK for the above three capabilities, or do any of > the capabilities require Oracle JDK? > > > > Thanks, > > > > Pankaj > >
OpenJDK or Oracle JDK: 8 or 11?
Hello, The documentation says that to run Flink, we need Java 8 or 11. Will JDK 11 work for running Flink, programming Flink applications as well as building Flink from source? Also, can we use Open JDK for the above three capabilities, or do any of the capabilities require Oracle JDK? Thanks, Pankaj
Re: Flink on Kubernetes Vs Flink Natively on Kubernetes
Thank you, Yang and Xintong! Best, Pankaj On Mon, Mar 16, 2020, 9:27 PM Yang Wang wrote: > Hi Pankaj, > > Just like Xintong has said, the biggest difference of Flink on Kubernetes > and native > integration is dynamic resource allocation. Since the latter has en > embedded K8s > client and will communicate with K8s Api server directly to > allocate/release JM/TM > pods. > > Both for the two ways to run Flink on K8s, you do not need to reserve the > whole > cluster for Flink. Flink could run with other workloads(e.g. Spark, > tensorflow, etc.). > The K8s cluster could guarantee the isolation. > > > Best, > Yang > > Pankaj Chand 于2020年3月16日周一 下午5:51写道: > >> Hi Xintong, >> >> Thank you for the explanation! >> >> If I run Flink "natively" on Kubernetes, will I also be able to run Spark >> on the same Kubernetes cluster, or will it make the Kubernetes cluster be >> reserved for Flink only? >> >> Thank you! >> >> Pankaj >> >> On Mon, Mar 16, 2020 at 5:41 AM Xintong Song >> wrote: >> >>> Forgot to mention that "running Flink natively on Kubernetes" is newly >>> introduced and is only available for Flink 1.10 and above. >>> >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Mon, Mar 16, 2020 at 5:40 PM Xintong Song >>> wrote: >>> >>>> Hi Pankaj, >>>> >>>> "Running Flink on Kubernetes" refers to the old way that basically >>>> deploys a Flink standalone cluster on Kubernetes. We leverage scripts to >>>> run Flink Master and TaskManager processes inside Kubernetes container. In >>>> this way, Flink is not ware of whether it's running in containers or >>>> directly on physical machines, and will not interact with the Kubernetes >>>> Master. Flink Master reactively accept all registered TaskManagers, whose >>>> number is decided by the Kubernetes replica. >>>> >>>> "Running Flink natively on Kubernetes" refers deploy Flink as a >>>> Kubernetes Job. Flink Master will interact with Kubernetes Master, and >>>> actively requests for pods/containers, like on Yarn/Mesos. >>>> >>>> Thank you~ >>>> >>>> Xintong Song >>>> >>>> >>>> >>>> On Mon, Mar 16, 2020 at 4:03 PM Pankaj Chand >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I want to run Flink, Spark and other processing engines on a single >>>>> Kubernetes cluster. >>>>> >>>>> From the Flink documentation, I did not understand the difference >>>>> between: >>>>> (1) Running Flink on Kubernetes, Versus (2) Running Flink natively on >>>>> Kubernetes. >>>>> >>>>> Could someone please explain the difference between the two, and when >>>>> would you use which option? >>>>> >>>>> Thank you, >>>>> >>>>> Pankaj >>>>> >>>>
Re: Flink on Kubernetes Vs Flink Natively on Kubernetes
Hi Xintong, Thank you for the explanation! If I run Flink "natively" on Kubernetes, will I also be able to run Spark on the same Kubernetes cluster, or will it make the Kubernetes cluster be reserved for Flink only? Thank you! Pankaj On Mon, Mar 16, 2020 at 5:41 AM Xintong Song wrote: > Forgot to mention that "running Flink natively on Kubernetes" is newly > introduced and is only available for Flink 1.10 and above. > > > Thank you~ > > Xintong Song > > > > On Mon, Mar 16, 2020 at 5:40 PM Xintong Song > wrote: > >> Hi Pankaj, >> >> "Running Flink on Kubernetes" refers to the old way that basically >> deploys a Flink standalone cluster on Kubernetes. We leverage scripts to >> run Flink Master and TaskManager processes inside Kubernetes container. In >> this way, Flink is not ware of whether it's running in containers or >> directly on physical machines, and will not interact with the Kubernetes >> Master. Flink Master reactively accept all registered TaskManagers, whose >> number is decided by the Kubernetes replica. >> >> "Running Flink natively on Kubernetes" refers deploy Flink as a >> Kubernetes Job. Flink Master will interact with Kubernetes Master, and >> actively requests for pods/containers, like on Yarn/Mesos. >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Mon, Mar 16, 2020 at 4:03 PM Pankaj Chand >> wrote: >> >>> Hi all, >>> >>> I want to run Flink, Spark and other processing engines on a single >>> Kubernetes cluster. >>> >>> From the Flink documentation, I did not understand the difference >>> between: >>> (1) Running Flink on Kubernetes, Versus (2) Running Flink natively on >>> Kubernetes. >>> >>> Could someone please explain the difference between the two, and when >>> would you use which option? >>> >>> Thank you, >>> >>> Pankaj >>> >>
Flink on Kubernetes Vs Flink Natively on Kubernetes
Hi all, I want to run Flink, Spark and other processing engines on a single Kubernetes cluster. >From the Flink documentation, I did not understand the difference between: (1) Running Flink on Kubernetes, Versus (2) Running Flink natively on Kubernetes. Could someone please explain the difference between the two, and when would you use which option? Thank you, Pankaj
Flink: Run Once Trigger feature like Spark's
Hi all, Please tell me, is there anything in Flink that is similar to Spark's structured streaming Run Once Trigger (or Trigger.Oncefeature) as described in the blog below: https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html This feature allows you to call a streaming job periodically instead of running it continuously. Thanks, Pankaj
Re: Sample Code for querying Flink's default metrics
Additionally, when an old job completes and I run a new job on the Flink Yarn session mode cluster, when I query for metrics before they become available for the new job, I sometimes get the last metrics for the old job instead. This happens even if I wait for the TaskManager to be released by Flink (as shown in the Flink's dashboard Web UI). This shouldn't happen since the Task_Manager ID "should" be different, though it would have the old index in the Task_Managers list. Would this be a bug? Thanks! Pankaj On Thu, Dec 12, 2019 at 5:59 AM Pankaj Chand wrote: > Thank you, Chesnay! > > On Thu, Dec 12, 2019 at 5:46 AM Chesnay Schepler > wrote: > >> Yes, when a cluster was started it takes a few seconds for (any) metrics >> to be available. >> >> On 12/12/2019 11:36, Pankaj Chand wrote: >> >> Hi Vino, >> >> Thank you for the links regarding backpressure! >> >> I am currently using code to get metrics by calling REST API via curl. >> However, many times the REST API via curl gives an empty JSON object/array. >> Piped through JQ (for filtering JSON) it produces a null value. This is >> breaking my code. >> Example in a Yarn cluster session mode, the following metric >> "metrics?get=Status.JVM.CPU.Load" randomly (I think) returns an empty json >> object/array or an actual value. >> >> Is it possible that for CPU Load, the empty JSON object is returned when >> the job is newly started less than 10 seconds ago. >> >> Thanks, >> >> Pankaj >> >> >> >> On Mon, Dec 9, 2019 at 4:21 AM vino yang wrote: >> >>> Hi Pankaj, >>> >>> > Is there any sample code for how to read such default metrics? Is >>> there any way to query the default metrics, such as CPU usage and Memory, >>> without using REST API or Reporters? >>> >>> What's your real requirement? Can you use code to call REST API? Why >>> does it not match your requirements? >>> >>> > Additionally, how do I query Backpressure using code, or is it still >>> only visually available via the dashboard UI? Consequently, is there any >>> way to infer Backpressure by querying one (or more) of the Memory metrics >>> of the TaskManager? >>> >>> The backpressure is related to not only memory metrics but also IO and >>> network metrics, for more details about measure backpressure please see >>> this blog.[1][2] >>> >>> [1]: https://flink.apache.org/2019/06/05/flink-network-stack.html >>> [2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html >>> >>> Best, >>> Vino >>> >>> Pankaj Chand 于2019年12月9日周一 下午12:07写道: >>> >>>> Hello, >>>> >>>> Using Flink on Yarn, I could not understand the documentation for how >>>> to read the default metrics via code. In particular, I want to read >>>> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and >>>> Memory. >>>> >>>> Is there any sample code for how to read such default metrics? Is >>>> there any way to query the default metrics, such as CPU usage and Memory, >>>> without using REST API or Reporters? >>>> >>>> Additionally, how do I query Backpressure using code, or is it still >>>> only visually available via the dashboard UI? Consequently, is there any >>>> way to infer Backpressure by querying one (or more) of the Memory metrics >>>> of the TaskManager? >>>> >>>> Thank you, >>>> >>>> Pankaj >>>> >>> >>
Re: Flink client trying to submit jobs to old session cluster (which was killed)
Vino and Kostas: Thank you for the info! I was using Flink 1.9.1 with Pre-bundled Hadoop 2.7.5. Cloudlab has quarantined my cluster experiment without notice , so I'll let you know if and when they allow me to access the files in the future. regards, Pankaj On Thu, Dec 12, 2019 at 8:35 AM Kostas Kloudas wrote: > Hi Pankaj, > > When you start a session cluster with the bin/yarn-session.sh script, > Flink will create the cluster and then write a "Yarn Properties file" > named ".yarn-properties-YOUR_USER_NAME" in the directory: > either the one specified by the option "yarn.properties-file.location" > in the flink-conf.yaml or in your local > System.getProperty("java.io.tmpdir"). This file will contain the > applicationId of the cluster and > it will be picked up by any future calls to `flink run`. Could you > check if this file exists and if it is updated every time you create a > cluster? > > Thanks, > Kostas > > On Thu, Dec 12, 2019 at 2:22 PM vino yang wrote: > > > > Hi Pankaj, > > > > Can you tell us what's Flink version do you use? And can you share the > Flink client and job manager log with us? > > > > This information would help us to locate your problem. > > > > Best, > > Vino > > > > Pankaj Chand 于2019年12月12日周四 下午7:08写道: > >> > >> Hello, > >> > >> When using Flink on YARN in session mode, each Flink job client would > automatically know the YARN cluster to connect to. It says this somewhere > in the documentation. > >> > >> So, I killed the Flink session cluster by simply killing the YARN > application using the "yarn kill" command. However, when starting a new > Flink session cluster and trying to submit Flink jobs to yarn-session, > Flink complains that the old cluster (it gave the port number and YARN > application ID) is not available. > >> > >> It seems like the details of the old cluster were still stored > somewhere in Flink. So, I had to completely replace the Flink folder with a > new one. > >> > >> Does anyone know the proper way to kill a Flink+YARN session cluster to > completely remove it so that jobs will get submitted to a new Flink session > cluster? > >> > >> Thanks, > >> > >> Pankaj >
Flink client trying to submit jobs to old session cluster (which was killed)
Hello, When using Flink on YARN in session mode, each Flink job client would automatically know the YARN cluster to connect to. It says this somewhere in the documentation. So, I killed the Flink session cluster by simply killing the YARN application using the "yarn kill" command. However, when starting a new Flink session cluster and trying to submit Flink jobs to yarn-session, Flink complains that the old cluster (it gave the port number and YARN application ID) is not available. It seems like the details of the old cluster were still stored somewhere in Flink. So, I had to completely replace the Flink folder with a new one. Does anyone know the proper way to kill a Flink+YARN session cluster to completely remove it so that jobs will get submitted to a new Flink session cluster? Thanks, Pankaj
Re: Sample Code for querying Flink's default metrics
Thank you, Chesnay! On Thu, Dec 12, 2019 at 5:46 AM Chesnay Schepler wrote: > Yes, when a cluster was started it takes a few seconds for (any) metrics > to be available. > > On 12/12/2019 11:36, Pankaj Chand wrote: > > Hi Vino, > > Thank you for the links regarding backpressure! > > I am currently using code to get metrics by calling REST API via curl. > However, many times the REST API via curl gives an empty JSON object/array. > Piped through JQ (for filtering JSON) it produces a null value. This is > breaking my code. > Example in a Yarn cluster session mode, the following metric > "metrics?get=Status.JVM.CPU.Load" randomly (I think) returns an empty json > object/array or an actual value. > > Is it possible that for CPU Load, the empty JSON object is returned when > the job is newly started less than 10 seconds ago. > > Thanks, > > Pankaj > > > > On Mon, Dec 9, 2019 at 4:21 AM vino yang wrote: > >> Hi Pankaj, >> >> > Is there any sample code for how to read such default metrics? Is >> there any way to query the default metrics, such as CPU usage and Memory, >> without using REST API or Reporters? >> >> What's your real requirement? Can you use code to call REST API? Why >> does it not match your requirements? >> >> > Additionally, how do I query Backpressure using code, or is it still >> only visually available via the dashboard UI? Consequently, is there any >> way to infer Backpressure by querying one (or more) of the Memory metrics >> of the TaskManager? >> >> The backpressure is related to not only memory metrics but also IO and >> network metrics, for more details about measure backpressure please see >> this blog.[1][2] >> >> [1]: https://flink.apache.org/2019/06/05/flink-network-stack.html >> [2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html >> >> Best, >> Vino >> >> Pankaj Chand 于2019年12月9日周一 下午12:07写道: >> >>> Hello, >>> >>> Using Flink on Yarn, I could not understand the documentation for how to >>> read the default metrics via code. In particular, I want to read >>> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and >>> Memory. >>> >>> Is there any sample code for how to read such default metrics? Is there >>> any way to query the default metrics, such as CPU usage and Memory, without >>> using REST API or Reporters? >>> >>> Additionally, how do I query Backpressure using code, or is it still >>> only visually available via the dashboard UI? Consequently, is there any >>> way to infer Backpressure by querying one (or more) of the Memory metrics >>> of the TaskManager? >>> >>> Thank you, >>> >>> Pankaj >>> >> >
Re: Sample Code for querying Flink's default metrics
Hi Vino, Thank you for the links regarding backpressure! I am currently using code to get metrics by calling REST API via curl. However, many times the REST API via curl gives an empty JSON object/array. Piped through JQ (for filtering JSON) it produces a null value. This is breaking my code. Example in a Yarn cluster session mode, the following metric "metrics?get=Status.JVM.CPU.Load" randomly (I think) returns an empty json object/array or an actual value. Is it possible that for CPU Load, the empty JSON object is returned when the job is newly started less than 10 seconds ago. Thanks, Pankaj On Mon, Dec 9, 2019 at 4:21 AM vino yang wrote: > Hi Pankaj, > > > Is there any sample code for how to read such default metrics? Is there > any way to query the default metrics, such as CPU usage and Memory, without > using REST API or Reporters? > > What's your real requirement? Can you use code to call REST API? Why does > it not match your requirements? > > > Additionally, how do I query Backpressure using code, or is it still > only visually available via the dashboard UI? Consequently, is there any > way to infer Backpressure by querying one (or more) of the Memory metrics > of the TaskManager? > > The backpressure is related to not only memory metrics but also IO and > network metrics, for more details about measure backpressure please see > this blog.[1][2] > > [1]: https://flink.apache.org/2019/06/05/flink-network-stack.html > [2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html > > Best, > Vino > > Pankaj Chand 于2019年12月9日周一 下午12:07写道: > >> Hello, >> >> Using Flink on Yarn, I could not understand the documentation for how to >> read the default metrics via code. In particular, I want to read >> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and >> Memory. >> >> Is there any sample code for how to read such default metrics? Is there >> any way to query the default metrics, such as CPU usage and Memory, without >> using REST API or Reporters? >> >> Additionally, how do I query Backpressure using code, or is it still only >> visually available via the dashboard UI? Consequently, is there any way to >> infer Backpressure by querying one (or more) of the Memory metrics of the >> TaskManager? >> >> Thank you, >> >> Pankaj >> >
Sample Code for querying Flink's default metrics
Hello, Using Flink on Yarn, I could not understand the documentation for how to read the default metrics via code. In particular, I want to read throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and Memory. Is there any sample code for how to read such default metrics? Is there any way to query the default metrics, such as CPU usage and Memory, without using REST API or Reporters? Additionally, how do I query Backpressure using code, or is it still only visually available via the dashboard UI? Consequently, is there any way to infer Backpressure by querying one (or more) of the Memory metrics of the TaskManager? Thank you, Pankaj
Re: How to install Flink + YARN?
Please disregard my last question. It is working fine with Hadoop 2.7.5. Thanks On Sat, Dec 7, 2019 at 2:13 AM Pankaj Chand wrote: > Is it required to use exactly the same versions of Hadoop as the > pre-bundled hadoop version? > > I'm using Hadoop 2.7.1 cluster with Flink 1.9.1 and the corresponding > Prebundled Hadoop 2.7.5. > > When I submit a job using: > > [vagrant@node1 flink]$ ./bin/flink run -m yarn-cluster > ./examples/streaming/SocketWindowWordCount.jar --port 9001 > > the tail of the log is empty , and i get the following messages: > > vagrant@node1 flink]$ ./bin/flink run -m yarn-cluster > ./examples/streaming/SocketWindowWordCount.jar --port 9001 > 2019-12-07 07:04:15,394 INFO org.apache.hadoop.yarn.client.RMProxy > - Connecting to ResourceManager at /0.0.0.0:8032 > 2019-12-07 07:04:15,493 INFO > org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path > for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2019-12-07 07:04:15,493 INFO > org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path > for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2019-12-07 07:04:16,615 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 0 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:17,617 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 1 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:18,619 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 2 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:19,621 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 3 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:20,629 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 4 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:21,632 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 5 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:22,634 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 6 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:23,639 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 7 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:24,644 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 8 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:25,651 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 9 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:56,677 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 0 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:57,679 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 1 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:58,680 INFO org.apache.hadoop.ipc.Client > - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. > Already tried 2 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 > MILLISECONDS) > 2019-12-07 07:04:59,686 INFO org.apache.hadoop.ipc.Client > - Retrying
Re: How to install Flink + YARN?
is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 2019-12-07 07:05:02,693 INFO org.apache.hadoop.ipc.Client - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 2019-12-07 07:05:03,695 INFO org.apache.hadoop.ipc.Client - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 2019-12-07 07:05:04,704 INFO org.apache.hadoop.ipc.Client - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 8 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 2019-12-07 07:05:05,715 INFO org.apache.hadoop.ipc.Client - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) Thanks! Pankaj On Wed, Nov 20, 2019 at 2:45 AM Pankaj Chand wrote: > Thank you, Ana and Yang! > > On Tue, Nov 19, 2019, 9:29 PM Yang Wang wrote: > >> Hi Pankaj, >> >> First, you need to prepare a hadoop environment separately, including >> hdfs and Yarn. If you are familiar >> with hadoop, you could download the binary[1] and start the cluster on >> you nodes manually. Otherwise, >> some tools may help you to deploy a hadoop cluster, ambari[2] and >> cloudera manager[2]. >> >> Then, download the Flink with Hadoop pre-bundle. You can submit your >> flink job now. >> >> Please make sure you set the correct HADOOP_CONF_DIR in your flink client >> before starting a flink cluster. >> >> >> >> [1].https://hadoop.apache.org/releases.html >> [2]. >> https://www.cloudera.com/products/open-source/apache-hadoop/apache-ambari.html >> [3]. >> https://www.cloudera.com/products/product-components/cloudera-manager.html >> >> Ana 于2019年11月20日周三 上午10:12写道: >> >>> Hi, >>> >>> I was able to run Flink on YARN by installing YARN and Flink separately. >>> >>> Thank you. >>> >>> Ana >>> >>> On Wed, Nov 20, 2019 at 10:42 AM Pankaj Chand >>> wrote: >>> >>>> Hello, >>>> >>>> I want to run Flink on YARN upon a cluster of nodes. From the >>>> documentation, I was not able to fully understand how to go about it. Some >>>> of the archived answers are a bit old and had pending JIRA issues, so I >>>> thought I would ask. >>>> >>>> Am I first supposed to install YARN separately, and then download the >>>> Flink file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put >>>> into Flink's /lib folder provide the entire YARN installation? >>>> >>>> Is there any download that bundles a complete *installation of Fink + >>>> installation of YARN*? >>>> >>>> Thank you, >>>> >>>> Pankaj >>>> >>>
Re: How to install Flink + YARN?
Thank you, Ana and Yang! On Tue, Nov 19, 2019, 9:29 PM Yang Wang wrote: > Hi Pankaj, > > First, you need to prepare a hadoop environment separately, including hdfs > and Yarn. If you are familiar > with hadoop, you could download the binary[1] and start the cluster on you > nodes manually. Otherwise, > some tools may help you to deploy a hadoop cluster, ambari[2] and cloudera > manager[2]. > > Then, download the Flink with Hadoop pre-bundle. You can submit your flink > job now. > > Please make sure you set the correct HADOOP_CONF_DIR in your flink client > before starting a flink cluster. > > > > [1].https://hadoop.apache.org/releases.html > [2]. > https://www.cloudera.com/products/open-source/apache-hadoop/apache-ambari.html > [3]. > https://www.cloudera.com/products/product-components/cloudera-manager.html > > Ana 于2019年11月20日周三 上午10:12写道: > >> Hi, >> >> I was able to run Flink on YARN by installing YARN and Flink separately. >> >> Thank you. >> >> Ana >> >> On Wed, Nov 20, 2019 at 10:42 AM Pankaj Chand >> wrote: >> >>> Hello, >>> >>> I want to run Flink on YARN upon a cluster of nodes. From the >>> documentation, I was not able to fully understand how to go about it. Some >>> of the archived answers are a bit old and had pending JIRA issues, so I >>> thought I would ask. >>> >>> Am I first supposed to install YARN separately, and then download the >>> Flink file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put >>> into Flink's /lib folder provide the entire YARN installation? >>> >>> Is there any download that bundles a complete *installation of Fink + >>> installation of YARN*? >>> >>> Thank you, >>> >>> Pankaj >>> >>
How to install Flink + YARN?
Hello, I want to run Flink on YARN upon a cluster of nodes. From the documentation, I was not able to fully understand how to go about it. Some of the archived answers are a bit old and had pending JIRA issues, so I thought I would ask. Am I first supposed to install YARN separately, and then download the Flink file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put into Flink's /lib folder provide the entire YARN installation? Is there any download that bundles a complete *installation of Fink + installation of YARN*? Thank you, Pankaj
Re: Cannot modify parallelism (rescale job) more than once
Thank you! On Mon, Oct 28, 2019 at 3:53 AM vino yang wrote: > Hi Pankaj, > > It seems it is a bug. You can report it by opening a Jira issue. > > Best, > Vino > > Pankaj Chand 于2019年10月28日周一 上午10:51写道: > >> Hello, >> >> I am trying to modify the parallelism of a streaming Flink job >> (wiki-edits example) multiple times on a standalone cluster (one local >> machine) having two TaskManagers with 3 slots each (i.e. 6 slots total). >> However, the "modify" command is only working once (e.g. when I change the >> parallelism from 2 to 4). The second time (e.g. change parallelism to 6 or >> even back to 2), it is giving an error. >> >> I am using Flink 1.8.1 (since I found that the modify parallelism command >> has been removed from v1.9 documentation) and have configured savepoints to >> be written to file:///home/pankaj/flink-checkpoints. The output of the >> first "modify -p 4" command and second "modify -p 6" >> command is copied below. >> >> Please tell me how to modify parallelism multiple times at runtime. >> >> Thanks, >> >> Pankaj >> >> >> $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4 >> Modify job 94831ca34951975dbee3335a384ee935. >> Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4. >> >> $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6 >> Modify job 94831ca34951975dbee3335a384ee935. >> >> >> The program finished with the following exception: >> >> org.apache.flink.util.FlinkException: Could not rescale job >> 94831ca34951975dbee3335a384ee935. >> at >> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799) >> at >> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) >> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) >> at >> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >> Caused by: java.util.concurrent.CompletionException: >> java.lang.IllegalStateException: Suspend needs to happen atomically >> at >> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) >> at >> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) >> at >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961) >> at >> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) >> at >> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >> at >> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >> at akka.actor.Actor.aroundReceive(Actor.scala:502) >> at akka.actor.Actor.aroundReceive$(Actor.scala:500) >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) >> at >> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) >> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) >> at >> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) >> Caused by: java.lang.IllegalStateException: Suspend needs to happen >> atomically >> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172) >> at >> org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221) >> at >> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465) >> at >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) >> ... 20 more >> >
Cannot modify parallelism (rescale job) more than once
Hello, I am trying to modify the parallelism of a streaming Flink job (wiki-edits example) multiple times on a standalone cluster (one local machine) having two TaskManagers with 3 slots each (i.e. 6 slots total). However, the "modify" command is only working once (e.g. when I change the parallelism from 2 to 4). The second time (e.g. change parallelism to 6 or even back to 2), it is giving an error. I am using Flink 1.8.1 (since I found that the modify parallelism command has been removed from v1.9 documentation) and have configured savepoints to be written to file:///home/pankaj/flink-checkpoints. The output of the first "modify -p 4" command and second "modify -p 6" command is copied below. Please tell me how to modify parallelism multiple times at runtime. Thanks, Pankaj $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4 Modify job 94831ca34951975dbee3335a384ee935. Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4. $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6 Modify job 94831ca34951975dbee3335a384ee935. The program finished with the following exception: org.apache.flink.util.FlinkException: Could not rescale job 94831ca34951975dbee3335a384ee935. at org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor.aroundReceive(Actor.scala:502) at akka.actor.Actor.aroundReceive$(Actor.scala:500) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172) at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ... 20 more
Re: Limit number of jobs or Job Managers
Hi Haibo Sun, It's exactly what I needed. Thank you so much! Best, Pankaj On Thu, Jun 27, 2019 at 7:45 AM Haibo Sun wrote: > Hi, Pankaj Chand > > If you're running Flink on YARN, you can do this by limiting the number of > applications in the cluster or in the queue. As far as I know, Flink does > not limit that. > > The following are the configuration items for YARN : > yarn.scheduler.capacity.maximum-applications > yarn.scheduler.capacity..maximum-applications > > Best, > Haibo > > At 2019-06-27 20:55:48, "Pankaj Chand" wrote: > > Hi everyone, > > Is there any way (parameter or function) I can limit the number of > concurrent jobs executing in my Flink cluster? Or alternatively, limit the > number of concurrent Job Managers (since there has to be one Job Manager > for every job)? > > Thanks! > > Pankaj > >
Limit number of jobs or Job Managers
Hi everyone, Is there any way (parameter or function) I can limit the number of concurrent jobs executing in my Flink cluster? Or alternatively, limit the number of concurrent Job Managers (since there has to be one Job Manager for every job)? Thanks! Pankaj
Re: Updated Flink Documentation and tutorials
Thank you! On Thu, Jun 20, 2019, 5:49 AM Chesnay Schepler wrote: > There is no version of the documentation that is more up-to-date. The > documentation was simply not updated yet for the new architecture. > > On 20/06/2019 11:45, Pankaj Chand wrote: > > Based on the below conversation (reverse chronological order) regarding my > previous question on the role of Job Manager in Flink: > > > Hi Biao, > > Thank you for your reply! > > Please let me know the url of the updated Flink documentation. > > The url of the outdated document is: > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html > > > Another page which (tacitly) supports the outdated concept is: > > https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html > > > The website that hosts these pages is also the first result that comes up > when you Google Search for "Flink documentation", and it claims it is a > stable version. The url is: > https://ci.apache.org/projects/flink/flink-docs-stable/ > > Again, please let me know the url of the updated Flink documentation. > > Thank you Biao and Eduardo! > > Pankaj > Hide quoted text > > On Tue, Jun 18, 2019 at 11:49 PM Biao Liu wrote: > > Hi Pankaj, > > That's really a good question. There was a refactor of architecture > before[1]. So there might be some descriptions used the outdated concept. > > Before refactoring, Job Manager is a centralized role. It controls whole > cluster and all jobs which is described in your interpretation 1. > > After refactoring, the old Job Manager is separated into several roles, > Resource Manager, Dispatcher, new Job Manager, etc. The new Job Manager is > responsible for only one job, which is described in your interpretation 2. > > So the document you refer to is outdated. Would you mind telling us the > URL of this document? I think we should update it to avoid misleading more > people. > > 1. > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 > > Eduardo Winpenny Tejedor 于2019年6月19日周三 > 上午1:12写道: > > Hi Pankaj, > > I have no experience with Hadoop but from the book I gathered there's one > Job Manager per application i.e. per jar (as in the example in the first > chapter). This is not to say there's one Job Manager per job. Actually I > don't think the word Job is defined in the book, I've seen Task defined, > and those do have Task Managers > > Hope this is along the right lines > > Regards, > Eduardo > > On Tue, 18 Jun 2019, 08:42 Pankaj Chand, > wrote: > > I am trying to understand the role of Job Manager in Flink, and have come > across two possibly distinct interpretations. > > 1. The online documentation v1.8 signifies that there is at least one Job > Manager in a cluster, and it is closely tied to the cluster of machines, by > managing all jobs in that cluster of machines. > > This signifies that Flink's Job Manager is much like Hadoop's Application > Manager. > > 2. The book, "Stream Processing with Apache Flink", writes that, "The Job > Manager is the master process that controls the execution of a single > application—each application is controlled by a different Job Manager." > > This signifies that Flink defaults to one Job Manager per job, and the Job > Manager is closely tied to that single job, much like Hadoop's Application > Master for each job. > > Please let me know which one is correct. > > Pankaj > > > On Thu, Jun 20, 2019, 4:54 AM Chesnay Schepler wrote: > >> What makes you believe that they are out-dated? >> >> On 19/06/2019 19:17, Pankaj Chand wrote: >> > Hello, >> > >> > Please let me know how to get the updated documentation and tutorials >> > of Apache Flink. >> > The stable v1.8 and v1.9-snapshot release of the documentation seems >> > to be outdated. >> > >> > Thanks! >> > >> > Pankaj >> >> >> >> >
Re: Updated Flink Documentation and tutorials
Based on the below conversation (reverse chronological order) regarding my previous question on the role of Job Manager in Flink: Hi Biao, Thank you for your reply! Please let me know the url of the updated Flink documentation. The url of the outdated document is: https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html Another page which (tacitly) supports the outdated concept is: https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html The website that hosts these pages is also the first result that comes up when you Google Search for "Flink documentation", and it claims it is a stable version. The url is: https://ci.apache.org/projects/flink/flink-docs-stable/ Again, please let me know the url of the updated Flink documentation. Thank you Biao and Eduardo! Pankaj Hide quoted text On Tue, Jun 18, 2019 at 11:49 PM Biao Liu wrote: Hi Pankaj, That's really a good question. There was a refactor of architecture before[1]. So there might be some descriptions used the outdated concept. Before refactoring, Job Manager is a centralized role. It controls whole cluster and all jobs which is described in your interpretation 1. After refactoring, the old Job Manager is separated into several roles, Resource Manager, Dispatcher, new Job Manager, etc. The new Job Manager is responsible for only one job, which is described in your interpretation 2. So the document you refer to is outdated. Would you mind telling us the URL of this document? I think we should update it to avoid misleading more people. 1. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 Eduardo Winpenny Tejedor 于2019年6月19日周三 上午1:12写道: Hi Pankaj, I have no experience with Hadoop but from the book I gathered there's one Job Manager per application i.e. per jar (as in the example in the first chapter). This is not to say there's one Job Manager per job. Actually I don't think the word Job is defined in the book, I've seen Task defined, and those do have Task Managers Hope this is along the right lines Regards, Eduardo On Tue, 18 Jun 2019, 08:42 Pankaj Chand, wrote: I am trying to understand the role of Job Manager in Flink, and have come across two possibly distinct interpretations. 1. The online documentation v1.8 signifies that there is at least one Job Manager in a cluster, and it is closely tied to the cluster of machines, by managing all jobs in that cluster of machines. This signifies that Flink's Job Manager is much like Hadoop's Application Manager. 2. The book, "Stream Processing with Apache Flink", writes that, "The Job Manager is the master process that controls the execution of a single application—each application is controlled by a different Job Manager." This signifies that Flink defaults to one Job Manager per job, and the Job Manager is closely tied to that single job, much like Hadoop's Application Master for each job. Please let me know which one is correct. Pankaj On Thu, Jun 20, 2019, 4:54 AM Chesnay Schepler wrote: > What makes you believe that they are out-dated? > > On 19/06/2019 19:17, Pankaj Chand wrote: > > Hello, > > > > Please let me know how to get the updated documentation and tutorials > > of Apache Flink. > > The stable v1.8 and v1.9-snapshot release of the documentation seems > > to be outdated. > > > > Thanks! > > > > Pankaj > > > >
Updated Flink Documentation and tutorials
Hello, Please let me know how to get the updated documentation and tutorials of Apache Flink. The stable v1.8 and v1.9-snapshot release of the documentation seems to be outdated. Thanks! Pankaj
Re: Role of Job Manager
Hi Biao, Thank you for your reply! Please let me know the url of the updated Flink documentation. The url of the outdated document is: https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html Another page which (tacitly) supports the outdated concept is: https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html The website that hosts these pages is also the first result that comes up when you Google Search for "Flink documentation", and it claims it is a stable version. The url is: https://ci.apache.org/projects/flink/flink-docs-stable/ Again, please let me know the url of the updated Flink documentation. Thank you Biao and Eduardo! Pankaj On Tue, Jun 18, 2019 at 11:49 PM Biao Liu wrote: > Hi Pankaj, > > That's really a good question. There was a refactor of architecture > before[1]. So there might be some descriptions used the outdated concept. > > Before refactoring, Job Manager is a centralized role. It controls whole > cluster and all jobs which is described in your interpretation 1. > > After refactoring, the old Job Manager is separated into several roles, > Resource Manager, Dispatcher, new Job Manager, etc. The new Job Manager is > responsible for only one job, which is described in your interpretation 2. > > So the document you refer to is outdated. Would you mind telling us the > URL of this document? I think we should update it to avoid misleading more > people. > > 1. > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 > > Eduardo Winpenny Tejedor 于2019年6月19日周三 > 上午1:12写道: > >> Hi Pankaj, >> >> I have no experience with Hadoop but from the book I gathered there's one >> Job Manager per application i.e. per jar (as in the example in the first >> chapter). This is not to say there's one Job Manager per job. Actually I >> don't think the word Job is defined in the book, I've seen Task defined, >> and those do have Task Managers >> >> Hope this is along the right lines >> >> Regards, >> Eduardo >> >> On Tue, 18 Jun 2019, 08:42 Pankaj Chand, >> wrote: >> >>> I am trying to understand the role of Job Manager in Flink, and have >>> come across two possibly distinct interpretations. >>> >>> 1. The online documentation v1.8 signifies that there is at least one >>> Job Manager in a cluster, and it is closely tied to the cluster of >>> machines, by managing all jobs in that cluster of machines. >>> >>> This signifies that Flink's Job Manager is much like Hadoop's >>> Application Manager. >>> >>> 2. The book, "Stream Processing with Apache Flink", writes that, "The >>> Job Manager is the master process that controls the execution of a single >>> application—each application is controlled by a different Job Manager." >>> >>> This signifies that Flink defaults to one Job Manager per job, and the >>> Job Manager is closely tied to that single job, much like Hadoop's >>> Application Master for each job. >>> >>> Please let me know which one is correct. >>> >>> Pankaj >>> >>
Role of Job Manager
I am trying to understand the role of Job Manager in Flink, and have come across two possibly distinct interpretations. 1. The online documentation v1.8 signifies that there is at least one Job Manager in a cluster, and it is closely tied to the cluster of machines, by managing all jobs in that cluster of machines. This signifies that Flink's Job Manager is much like Hadoop's Application Manager. 2. The book, "Stream Processing with Apache Flink", writes that, "The Job Manager is the master process that controls the execution of a single application—each application is controlled by a different Job Manager." This signifies that Flink defaults to one Job Manager per job, and the Job Manager is closely tied to that single job, much like Hadoop's Application Master for each job. Please let me know which one is correct. Pankaj