Re: Print on screen DataStream content

2020-11-23 Thread Pankaj Chand
Please correct me if I am wrong. `DataStream#print()` only prints to the
screen when running from the IDE, but does not work (print to the screen)
when running on a cluster (even a local cluster).

Thanks,

Pankaj

On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Simone,
>
> I'd suggest trying out the `DataStream#print()` function to start, but
> there are a few other easy-to-integrate sinks for testing that you can
> check out in the docs here[1]
>
> Best,
> Austin
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks
>
> On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin 
> wrote:
>
>> Hi All,
>>
>> On my code I have a DataStream that I would like to access. I need to
>> understand what I'm getting for each transformation to check if the data
>> that I'm working on make sense. How can I print into the console or get a
>> file (csv, txt) for the variables: "stream", "enriched" and "result"?
>>
>> I have tried different way but no way to get the data.
>>
>> Thanks!
>>
>>
>> *FlinkKafkaConsumer kafkaData =*
>> *new FlinkKafkaConsumer("CorID_1", new
>> EventDeserializationSchema(), p);*
>> *WatermarkStrategy wmStrategy =*
>> *WatermarkStrategy*
>> *.forMonotonousTimestamps()*
>> *.withIdleness(Duration.ofMinutes(1))*
>> *.withTimestampAssigner((event, timestamp) -> {*
>> *return event.get_Time();*
>> *});*
>> *DataStream stream = env.addSource(*
>> *kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>>
>> *DataStream> enriched = stream*
>> *.keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)*
>> *.map(new StatefulSessionCalculator());*
>>
>> *WindowedStream, String, TimeWindow> result =
>> enriched*
>> *.keyBy(new MyKeySelector())*
>> *.window(EventTimeSessionWindows.withDynamicGap(new
>> DynamicSessionWindows()));*
>>
>


Re: Processing single events for minimum latency

2020-10-15 Thread Pankaj Chand
Thank you for the quick and informative reply, Piotrek!

On Thu, Oct 15, 2020 at 2:09 AM Piotr Nowojski  wrote:

> Hi Pankay,
>
> Yes, you can trigger a window per each element, take a look at the Window
> Triggers [1].
>
> Flink is always processing all records immediately. The only things that
> can delay processing elements are:
> - buffering elements on the operator's state (vide WindowOperator)
> - buffer-timeout (but that's on the output, so it's not delaying
> processing per se)
> - back pressure
> - exactly-once checkpointing (especially under the back pressure)
>
> > Also, is there any way I can change the execution.buffer-timeout or
> setbuffertimeout(milliseconds) dynamically while the job is running?
>
> No, sorry it's not possible :(
>
> Best,
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>
> czw., 15 paź 2020 o 01:55 Pankaj Chand 
> napisał(a):
>
>> Hi Piotrek,
>>
>> Thank you for replying! I want to process each record as soon as it is
>> ingested (or reaches an operator) without waiting for a window for records
>> to arrive. However, by not using windows, I am not sure if each record gets
>> emitted immediately upon processing.
>>
>> > You still can use windowing, but you might want to emit updated value
>> of the window per every processed record.
>>
>> How do I do this?
>>
>> Also, is there any way I can change the execution.buffer-timeout or
>> setbuffertimeout(milliseconds) dynamically while the job is running?
>>
>> Thank you,
>>
>> Pankaj
>>
>> On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi Pankaj,
>>>
>>> I'm not entirely sure if I understand your question.
>>>
>>> If you want to minimize latency, you should avoid using windows or any
>>> other operators, that are buffering data for long periods of time. You
>>> still can use windowing, but you might want to emit updated value of the
>>> window per every processed record.
>>>
>>> Other than that, you might also want to reduce
>>> `execution.buffer-timeout` from the default value of 100ms down to 1ms, or
>>> 0ms [1]. Is this what you are looking for?
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>>
>>> śr., 14 paź 2020 o 12:38 Pankaj Chand 
>>> napisał(a):
>>>
>>>> Hi all,
>>>>
>>>> What is the recommended way to make a Flink job that processes each
>>>> event individually as soon as it comes and without waiting for a
>>>> window, in order to minimize latency in the entire DAG of operators?
>>>>
>>>> For example, here is some sample WordCount code (without windws),
>>>> followed by some known ways:
>>>>
>>>>  DataStream wordCounts = text
>>>> .flatMap(new FlatMapFunction())
>>>> .keyBy("word")
>>>> .reduce(new ReduceFunction());
>>>>
>>>>
>>>>1. Don't include any TimeWindow/CountWindow function (does this
>>>>actually achieve what I want?)
>>>>2. Use a CountWindow with a count of 1
>>>>3. Make a Trigger that fires to process each event when it comes in
>>>>
>>>> I think the above methods only work at the processing level and latency
>>>> with respect to a single operator, but does not affect the latency of an
>>>> event in the entire Flink job's DAG since those ways do not affect the
>>>> buffertimeout value.
>>>>
>>>> Thanks,
>>>>
>>>> Pankaj
>>>>
>>>


Re: Processing single events for minimum latency

2020-10-14 Thread Pankaj Chand
Hi Piotrek,

Thank you for replying! I want to process each record as soon as it is
ingested (or reaches an operator) without waiting for a window for records
to arrive. However, by not using windows, I am not sure if each record gets
emitted immediately upon processing.

> You still can use windowing, but you might want to emit updated value of
the window per every processed record.

How do I do this?

Also, is there any way I can change the execution.buffer-timeout or
setbuffertimeout(milliseconds) dynamically while the job is running?

Thank you,

Pankaj

On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski  wrote:

> Hi Pankaj,
>
> I'm not entirely sure if I understand your question.
>
> If you want to minimize latency, you should avoid using windows or any
> other operators, that are buffering data for long periods of time. You
> still can use windowing, but you might want to emit updated value of the
> window per every processed record.
>
> Other than that, you might also want to reduce `execution.buffer-timeout`
> from the default value of 100ms down to 1ms, or 0ms [1]. Is this what you
> are looking for?
>
> Piotrek
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> śr., 14 paź 2020 o 12:38 Pankaj Chand 
> napisał(a):
>
>> Hi all,
>>
>> What is the recommended way to make a Flink job that processes each event
>> individually as soon as it comes and without waiting for a window, in order
>> to minimize latency in the entire DAG of operators?
>>
>> For example, here is some sample WordCount code (without windws),
>> followed by some known ways:
>>
>>  DataStream wordCounts = text
>> .flatMap(new FlatMapFunction())
>> .keyBy("word")
>> .reduce(new ReduceFunction());
>>
>>
>>1. Don't include any TimeWindow/CountWindow function (does this
>>actually achieve what I want?)
>>2. Use a CountWindow with a count of 1
>>3. Make a Trigger that fires to process each event when it comes in
>>
>> I think the above methods only work at the processing level and latency
>> with respect to a single operator, but does not affect the latency of an
>> event in the entire Flink job's DAG since those ways do not affect the
>> buffertimeout value.
>>
>> Thanks,
>>
>> Pankaj
>>
>


Processing single events for minimum latency

2020-10-14 Thread Pankaj Chand
Hi all,

What is the recommended way to make a Flink job that processes each event
individually as soon as it comes and without waiting for a window, in order
to minimize latency in the entire DAG of operators?

For example, here is some sample WordCount code (without windws),
followed by some known ways:

 DataStream wordCounts = text
.flatMap(new FlatMapFunction())
.keyBy("word")
.reduce(new ReduceFunction());


   1. Don't include any TimeWindow/CountWindow function (does this actually
   achieve what I want?)
   2. Use a CountWindow with a count of 1
   3. Make a Trigger that fires to process each event when it comes in

I think the above methods only work at the processing level and latency
with respect to a single operator, but does not affect the latency of an
event in the entire Flink job's DAG since those ways do not affect the
buffertimeout value.

Thanks,

Pankaj


Re: How to get Latency Tracking results?

2020-09-10 Thread Pankaj Chand
Actually, I was wrong. It turns out I was setting the values the wrong way
in conf/flink-conf.yaml.
I set "metrics.latency.interval 100" instead of "metrics.latency.interval:
100".  Sorry about that.

T

On Thu, Sep 10, 2020 at 7:05 AM Pankaj Chand 
wrote:

> Thank you, David!
>
> After setting ExecutionConfig for latency tracking in the
> SocketWindowWordCount Java source code and rebuilding and using that
> application jar file, I am now getting the latency tracking metrics using
> the REST endpoint. The below documentation [1] seems to imply that
> merely setting the parameters in flink-conf.yaml would generate the latency
> tracking metrics, which is not happening in my case.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#latency-tracking
>
> On Wed, Sep 9, 2020 at 10:08 AM David Anderson 
> wrote:
>
>> Pankaj,
>>
>> I just checked, and the latency metrics for SocketWindowWordCount show up
>> just fine for me with Flink 1.11.1. For me, the latency metrics existed
>> even before I provided any data on the socket for the job to process. This
>> makes sense, as the latency tracking markers will propagate through the job
>> graph and provide measurements despite the lack of any real data for the
>> job to process.
>>
>> These metrics have horrible names, but for me,
>>
>> curl -s
>> http://localhost:8081/jobs/69f954cb93c680fb2b210f877c377131/metrics
>>
>> reveals a bunch of latency metrics, and, for example,
>>
>> curl -s
>> http://localhost:8081/jobs/69f954cb93c680fb2b210f877c377131/metrics?get=latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.17fbfcaabad45985bbdf4da0490487e3.operator_subtask_index.0.latency_p999
>>
>> returns
>>
>>
>> [{"id":"latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.17fbfcaabad45985bbdf4da0490487e3.operator_subtask_index.0.latency_p999","value":"105.0"}]
>>
>> Best,
>> David
>>
>>
>> On Wed, Sep 9, 2020 at 3:16 PM Pankaj Chand 
>> wrote:
>>
>>> Hi David,
>>>
>>> Thanks for replying! Sorry, I forgot to mention I am using
>>> Flink Version: 1.11.1, Commit ID: 7eb514a.
>>> Is it possible that the default SocketWindowWordCount job is too simple
>>> to generate Latency metrics? Or that the latency metrics disappear from the
>>> output JSON when the data ingestion is zero?
>>>
>>> Thanks,
>>>
>>> Pankaj
>>>
>>>
>>> On Wed, Sep 9, 2020 at 6:27 AM David Anderson 
>>> wrote:
>>>
>>>> Pankaj,
>>>>
>>>> The Flink web UI doesn't do any visualizations of histogram metrics, so
>>>> the only way to access the latency metrics is either through the REST api
>>>> or a metrics reporter.
>>>>
>>>> The REST endpoint you tried is the correct place to find these metrics
>>>> in all recent versions of Flink, but somewhere back before Flink 1.5 or 1.6
>>>> (if I recall correctly) these metrics were task metrics. So if you are
>>>> using an older version of Flink you'll need to dig deeper. I believe you'll
>>>> find them in
>>>>
>>>> /jobs//vertices//subtasks/metrics
>>>>
>>>> Regards,
>>>> David
>>>>
>>>>
>>>>
>>>> On Tue, Sep 8, 2020 at 10:52 PM Pankaj Chand 
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> How do I visualize (or extract) the results for Latency Tracking for a
>>>>> Flink local cluster? I set "metrics.latency.interval 100" in the
>>>>> conf/flink-conf.yaml file, and started the cluster and
>>>>> SocketWindowWordCount job. However, I could not find the latency
>>>>> distributions anywhere in the web UI, nor are there any latency metrics in
>>>>> the Metrics dropdown box for either task.
>>>>>
>>>>> I also set "metrics.latency.granularity "operator"" in
>>>>> conf/flink-conf.yaml, but that did not help.
>>>>>
>>>>> When I tried to query the REST endpoint, I got the following output
>>>>> which did not seem to contain anything related to latency:
>>>>>
>>>>> $ curl -s
>>>>> http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics
>>>>>
>>>>>
>>>>> [{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}]
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Pankaj
>>>>>
>>>>


Re: How to get Latency Tracking results?

2020-09-10 Thread Pankaj Chand
Thank you, David!

After setting ExecutionConfig for latency tracking in the
SocketWindowWordCount Java source code and rebuilding and using that
application jar file, I am now getting the latency tracking metrics using
the REST endpoint. The below documentation [1] seems to imply that
merely setting the parameters in flink-conf.yaml would generate the latency
tracking metrics, which is not happening in my case.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#latency-tracking

On Wed, Sep 9, 2020 at 10:08 AM David Anderson 
wrote:

> Pankaj,
>
> I just checked, and the latency metrics for SocketWindowWordCount show up
> just fine for me with Flink 1.11.1. For me, the latency metrics existed
> even before I provided any data on the socket for the job to process. This
> makes sense, as the latency tracking markers will propagate through the job
> graph and provide measurements despite the lack of any real data for the
> job to process.
>
> These metrics have horrible names, but for me,
>
> curl -s
> http://localhost:8081/jobs/69f954cb93c680fb2b210f877c377131/metrics
>
> reveals a bunch of latency metrics, and, for example,
>
> curl -s
> http://localhost:8081/jobs/69f954cb93c680fb2b210f877c377131/metrics?get=latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.17fbfcaabad45985bbdf4da0490487e3.operator_subtask_index.0.latency_p999
>
> returns
>
>
> [{"id":"latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.17fbfcaabad45985bbdf4da0490487e3.operator_subtask_index.0.latency_p999","value":"105.0"}]
>
> Best,
> David
>
>
> On Wed, Sep 9, 2020 at 3:16 PM Pankaj Chand 
> wrote:
>
>> Hi David,
>>
>> Thanks for replying! Sorry, I forgot to mention I am using Flink Version:
>> 1.11.1, Commit ID: 7eb514a.
>> Is it possible that the default SocketWindowWordCount job is too simple
>> to generate Latency metrics? Or that the latency metrics disappear from the
>> output JSON when the data ingestion is zero?
>>
>> Thanks,
>>
>> Pankaj
>>
>>
>> On Wed, Sep 9, 2020 at 6:27 AM David Anderson 
>> wrote:
>>
>>> Pankaj,
>>>
>>> The Flink web UI doesn't do any visualizations of histogram metrics, so
>>> the only way to access the latency metrics is either through the REST api
>>> or a metrics reporter.
>>>
>>> The REST endpoint you tried is the correct place to find these metrics
>>> in all recent versions of Flink, but somewhere back before Flink 1.5 or 1.6
>>> (if I recall correctly) these metrics were task metrics. So if you are
>>> using an older version of Flink you'll need to dig deeper. I believe you'll
>>> find them in
>>>
>>> /jobs//vertices//subtasks/metrics
>>>
>>> Regards,
>>> David
>>>
>>>
>>>
>>> On Tue, Sep 8, 2020 at 10:52 PM Pankaj Chand 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> How do I visualize (or extract) the results for Latency Tracking for a
>>>> Flink local cluster? I set "metrics.latency.interval 100" in the
>>>> conf/flink-conf.yaml file, and started the cluster and
>>>> SocketWindowWordCount job. However, I could not find the latency
>>>> distributions anywhere in the web UI, nor are there any latency metrics in
>>>> the Metrics dropdown box for either task.
>>>>
>>>> I also set "metrics.latency.granularity "operator"" in
>>>> conf/flink-conf.yaml, but that did not help.
>>>>
>>>> When I tried to query the REST endpoint, I got the following output
>>>> which did not seem to contain anything related to latency:
>>>>
>>>> $ curl -s
>>>> http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics
>>>>
>>>>
>>>> [{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}]
>>>>
>>>> Thanks,
>>>>
>>>> Pankaj
>>>>
>>>


Re: How to get Latency Tracking results?

2020-09-09 Thread Pankaj Chand
Hi David,

Thanks for replying! Sorry, I forgot to mention I am using Flink Version:
1.11.1, Commit ID: 7eb514a.
Is it possible that the default SocketWindowWordCount job is too simple to
generate Latency metrics? Or that the latency metrics disappear from the
output JSON when the data ingestion is zero?

Thanks,

Pankaj


On Wed, Sep 9, 2020 at 6:27 AM David Anderson  wrote:

> Pankaj,
>
> The Flink web UI doesn't do any visualizations of histogram metrics, so
> the only way to access the latency metrics is either through the REST api
> or a metrics reporter.
>
> The REST endpoint you tried is the correct place to find these metrics in
> all recent versions of Flink, but somewhere back before Flink 1.5 or 1.6
> (if I recall correctly) these metrics were task metrics. So if you are
> using an older version of Flink you'll need to dig deeper. I believe you'll
> find them in
>
> /jobs//vertices//subtasks/metrics
>
> Regards,
> David
>
>
>
> On Tue, Sep 8, 2020 at 10:52 PM Pankaj Chand 
> wrote:
>
>> Hello,
>>
>> How do I visualize (or extract) the results for Latency Tracking for a
>> Flink local cluster? I set "metrics.latency.interval 100" in the
>> conf/flink-conf.yaml file, and started the cluster and
>> SocketWindowWordCount job. However, I could not find the latency
>> distributions anywhere in the web UI, nor are there any latency metrics in
>> the Metrics dropdown box for either task.
>>
>> I also set "metrics.latency.granularity "operator"" in
>> conf/flink-conf.yaml, but that did not help.
>>
>> When I tried to query the REST endpoint, I got the following output which
>> did not seem to contain anything related to latency:
>>
>> $ curl -s
>> http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics
>>
>>
>> [{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}]
>>
>> Thanks,
>>
>> Pankaj
>>
>


How to get Latency Tracking results?

2020-09-08 Thread Pankaj Chand
Hello,

How do I visualize (or extract) the results for Latency Tracking for a
Flink local cluster? I set "metrics.latency.interval 100" in the
conf/flink-conf.yaml file, and started the cluster and
SocketWindowWordCount job. However, I could not find the latency
distributions anywhere in the web UI, nor are there any latency metrics in
the Metrics dropdown box for either task.

I also set "metrics.latency.granularity "operator"" in
conf/flink-conf.yaml, but that did not help.

When I tried to query the REST endpoint, I got the following output which
did not seem to contain anything related to latency:

$ curl -s
http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics

[{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}]

Thanks,

Pankaj


Re: Implementation of setBufferTimeout(timeoutMillis)

2020-08-31 Thread Pankaj Chand
Thank you so much, Yun! It is exactly what I needed.

On Mon, Aug 31, 2020 at 1:50 AM Yun Gao  wrote:

> Hi Pankaj,
>
> I think it should be in
> org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.
>
> Best,
>  Yun
>
>
>
> --
> Sender:Pankaj Chand
> Date:2020/08/31 02:40:15
> Recipient:user
> Theme:Implementation of setBufferTimeout(timeoutMillis)
>
> Hello,
>
> The documentation gives the following two sample lines for setting the
> buffer timeout for the streaming environment or transformation.
>
>
>
> *env.setBufferTimeout(timeoutMillis);env.generateSequence(1,10).map(new
> MyMapper()).setBufferTimeout(timeoutMillis);*
>
> I have been trying to find where (file and method) in the Flink source
> code are the buffers being flushed by iteratively referring to the value of
> timeoutMillis (or the default value), but have been unsuccessful. Please
> help.
>
> Thanks,
>
> Pankaj
>
>


Implementation of setBufferTimeout(timeoutMillis)

2020-08-30 Thread Pankaj Chand
Hello,

The documentation gives the following two sample lines for setting the
buffer timeout for the streaming environment or transformation.



*env.setBufferTimeout(timeoutMillis);env.generateSequence(1,10).map(new
MyMapper()).setBufferTimeout(timeoutMillis);*

I have been trying to find where (file and method) in the Flink source code
are the buffers being flushed by iteratively referring to the value of
timeoutMillis (or the default value), but have been unsuccessful. Please
help.

Thanks,

Pankaj


Re: OpenJDK or Oracle JDK: 8 or 11?

2020-08-22 Thread Pankaj Chand
Thank you, Arvid and Marco!

On Sat, Aug 22, 2020 at 2:03 PM Marco Villalobos 
wrote:

> Hi Pankaj,
>
> I highly recommend that you use an OpenJDK version 11 because each JDK
> upgrade has a performance improvement, and also because the Oracle JDK and
> OpenJDK are based off the same code-base. The main difference between
> Oracle and OpenJDK is the branding and price.
>
>
> > On Aug 22, 2020, at 4:23 AM, Pankaj Chand 
> wrote:
> >
> > Hello,
> >
> > The documentation says that to run Flink, we need Java 8 or 11.
> >
> > Will JDK 11 work for running Flink, programming Flink applications as
> well as building Flink from source?
> >
> > Also, can we use Open JDK for the above three capabilities, or do any of
> the capabilities require Oracle JDK?
> >
> > Thanks,
> >
> > Pankaj
>
>


OpenJDK or Oracle JDK: 8 or 11?

2020-08-22 Thread Pankaj Chand
Hello,

The documentation says that to run Flink, we need Java 8 or 11.

Will JDK 11 work for running Flink, programming Flink applications as well
as building Flink from source?

Also, can we use Open JDK for the above three capabilities, or do any of
the capabilities require Oracle JDK?

Thanks,

Pankaj


Re: Flink on Kubernetes Vs Flink Natively on Kubernetes

2020-03-17 Thread Pankaj Chand
Thank you, Yang and Xintong!

Best,

Pankaj

On Mon, Mar 16, 2020, 9:27 PM Yang Wang  wrote:

> Hi Pankaj,
>
> Just like Xintong has said, the biggest difference of Flink on Kubernetes
> and native
> integration is dynamic resource allocation. Since the latter has en
> embedded K8s
> client and will communicate with K8s Api server directly to
> allocate/release JM/TM
> pods.
>
> Both for the two ways to run Flink on K8s, you do not need to reserve the
> whole
> cluster for Flink. Flink could run with other workloads(e.g. Spark,
> tensorflow, etc.).
> The K8s cluster could guarantee the isolation.
>
>
> Best,
> Yang
>
> Pankaj Chand  于2020年3月16日周一 下午5:51写道:
>
>> Hi Xintong,
>>
>> Thank you for the explanation!
>>
>> If I run Flink "natively" on Kubernetes, will I also be able to run Spark
>> on the same Kubernetes cluster, or will it make the Kubernetes cluster be
>> reserved for Flink only?
>>
>> Thank you!
>>
>> Pankaj
>>
>> On Mon, Mar 16, 2020 at 5:41 AM Xintong Song 
>> wrote:
>>
>>> Forgot to mention that "running Flink natively on Kubernetes" is newly
>>> introduced and is only available for Flink 1.10 and above.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Mon, Mar 16, 2020 at 5:40 PM Xintong Song 
>>> wrote:
>>>
>>>> Hi Pankaj,
>>>>
>>>> "Running Flink on Kubernetes" refers to the old way that basically
>>>> deploys a Flink standalone cluster on Kubernetes. We leverage scripts to
>>>> run Flink Master and TaskManager processes inside Kubernetes container. In
>>>> this way, Flink is not ware of whether it's running in containers or
>>>> directly on physical machines, and will not interact with the Kubernetes
>>>> Master. Flink Master reactively accept all registered TaskManagers, whose
>>>> number is decided by the Kubernetes replica.
>>>>
>>>> "Running Flink natively on Kubernetes" refers deploy Flink as a
>>>> Kubernetes Job. Flink Master will interact with Kubernetes Master, and
>>>> actively requests for pods/containers, like on Yarn/Mesos.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Mon, Mar 16, 2020 at 4:03 PM Pankaj Chand 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I want to run Flink, Spark and other processing engines on a single
>>>>> Kubernetes cluster.
>>>>>
>>>>> From the Flink documentation, I did not understand the difference
>>>>> between:
>>>>> (1) Running Flink on Kubernetes, Versus (2) Running Flink natively on
>>>>> Kubernetes.
>>>>>
>>>>> Could someone please explain the difference between the two, and when
>>>>> would you use which option?
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Pankaj
>>>>>
>>>>


Re: Flink on Kubernetes Vs Flink Natively on Kubernetes

2020-03-16 Thread Pankaj Chand
Hi Xintong,

Thank you for the explanation!

If I run Flink "natively" on Kubernetes, will I also be able to run Spark
on the same Kubernetes cluster, or will it make the Kubernetes cluster be
reserved for Flink only?

Thank you!

Pankaj

On Mon, Mar 16, 2020 at 5:41 AM Xintong Song  wrote:

> Forgot to mention that "running Flink natively on Kubernetes" is newly
> introduced and is only available for Flink 1.10 and above.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Mar 16, 2020 at 5:40 PM Xintong Song 
> wrote:
>
>> Hi Pankaj,
>>
>> "Running Flink on Kubernetes" refers to the old way that basically
>> deploys a Flink standalone cluster on Kubernetes. We leverage scripts to
>> run Flink Master and TaskManager processes inside Kubernetes container. In
>> this way, Flink is not ware of whether it's running in containers or
>> directly on physical machines, and will not interact with the Kubernetes
>> Master. Flink Master reactively accept all registered TaskManagers, whose
>> number is decided by the Kubernetes replica.
>>
>> "Running Flink natively on Kubernetes" refers deploy Flink as a
>> Kubernetes Job. Flink Master will interact with Kubernetes Master, and
>> actively requests for pods/containers, like on Yarn/Mesos.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Mar 16, 2020 at 4:03 PM Pankaj Chand 
>> wrote:
>>
>>> Hi all,
>>>
>>> I want to run Flink, Spark and other processing engines on a single
>>> Kubernetes cluster.
>>>
>>> From the Flink documentation, I did not understand the difference
>>> between:
>>> (1) Running Flink on Kubernetes, Versus (2) Running Flink natively on
>>> Kubernetes.
>>>
>>> Could someone please explain the difference between the two, and when
>>> would you use which option?
>>>
>>> Thank you,
>>>
>>> Pankaj
>>>
>>


Flink on Kubernetes Vs Flink Natively on Kubernetes

2020-03-16 Thread Pankaj Chand
Hi all,

I want to run Flink, Spark and other processing engines on a single
Kubernetes cluster.

>From the Flink documentation, I did not understand the difference between:
(1) Running Flink on Kubernetes, Versus (2) Running Flink natively on
Kubernetes.

Could someone please explain the difference between the two, and when would
you use which option?

Thank you,

Pankaj


Flink: Run Once Trigger feature like Spark's

2020-02-28 Thread Pankaj Chand
Hi all,

Please tell me, is there anything in Flink that is similar to Spark's
structured streaming Run Once Trigger (or Trigger.Oncefeature) as described
in the blog below:

https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html


This feature allows you to call a streaming job periodically instead of
running it continuously.
Thanks,

Pankaj


Re: Sample Code for querying Flink's default metrics

2019-12-13 Thread Pankaj Chand
Additionally, when an old job completes and I run a new job on the Flink
Yarn session mode cluster, when I query for metrics before they become
available for the new job, I sometimes get the last metrics for the old job
instead. This happens even if I wait for the TaskManager to be released by
Flink (as shown in the Flink's dashboard Web UI).

This shouldn't happen since the Task_Manager ID "should" be different,
though it would have the old index in the Task_Managers list.
Would this be a bug?

Thanks!

Pankaj

On Thu, Dec 12, 2019 at 5:59 AM Pankaj Chand 
wrote:

> Thank you, Chesnay!
>
> On Thu, Dec 12, 2019 at 5:46 AM Chesnay Schepler 
> wrote:
>
>> Yes, when a cluster was started it takes a few seconds for (any) metrics
>> to be available.
>>
>> On 12/12/2019 11:36, Pankaj Chand wrote:
>>
>> Hi Vino,
>>
>> Thank you for the links regarding backpressure!
>>
>> I am currently using code to get metrics by calling REST API via curl.
>> However, many times the REST API via curl gives an empty JSON object/array.
>> Piped through JQ (for filtering JSON) it produces a null value. This is
>> breaking my code.
>> Example in a Yarn cluster session mode, the following metric
>> "metrics?get=Status.JVM.CPU.Load" randomly (I think) returns an empty json
>> object/array or an actual value.
>>
>> Is it possible that for CPU Load, the empty JSON object is returned when
>> the job is newly started less than 10 seconds ago.
>>
>> Thanks,
>>
>> Pankaj
>>
>>
>>
>> On Mon, Dec 9, 2019 at 4:21 AM vino yang  wrote:
>>
>>> Hi Pankaj,
>>>
>>> > Is there any sample code for how to read such default metrics?  Is
>>> there any way to query the default metrics, such as CPU usage and Memory,
>>> without using REST API or Reporters?
>>>
>>> What's your real requirement? Can you use code to call REST API?  Why
>>> does it not match your requirements?
>>>
>>> > Additionally, how do I query Backpressure using code, or is it still
>>> only visually available via the dashboard UI? Consequently, is there any
>>> way to infer Backpressure by querying one (or more) of the Memory metrics
>>> of the TaskManager?
>>>
>>> The backpressure is related to not only memory metrics but also IO and
>>> network metrics, for more details about measure backpressure please see
>>> this blog.[1][2]
>>>
>>> [1]: https://flink.apache.org/2019/06/05/flink-network-stack.html
>>> [2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html
>>>
>>> Best,
>>> Vino
>>>
>>> Pankaj Chand  于2019年12月9日周一 下午12:07写道:
>>>
>>>> Hello,
>>>>
>>>> Using Flink on Yarn, I could not understand the documentation for how
>>>> to read the default metrics via code. In particular, I want to read
>>>> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
>>>> Memory.
>>>>
>>>> Is there any sample code for how to read such default metrics?  Is
>>>> there any way to query the default metrics, such as CPU usage and Memory,
>>>> without using REST API or Reporters?
>>>>
>>>> Additionally, how do I query Backpressure using code, or is it still
>>>> only visually available via the dashboard UI? Consequently, is there any
>>>> way to infer Backpressure by querying one (or more) of the Memory metrics
>>>> of the TaskManager?
>>>>
>>>> Thank you,
>>>>
>>>> Pankaj
>>>>
>>>
>>


Re: Flink client trying to submit jobs to old session cluster (which was killed)

2019-12-12 Thread Pankaj Chand
Vino and Kostas:

Thank you for the info!

I was using Flink 1.9.1 with Pre-bundled Hadoop 2.7.5.

Cloudlab has quarantined my cluster experiment without notice , so I'll
let you know if and when they allow me to access the files in the future.

regards,

Pankaj

On Thu, Dec 12, 2019 at 8:35 AM Kostas Kloudas  wrote:

> Hi Pankaj,
>
> When you start a session cluster with the bin/yarn-session.sh script,
> Flink will create the cluster and then write a "Yarn Properties file"
> named ".yarn-properties-YOUR_USER_NAME" in the directory:
> either the one specified by the option "yarn.properties-file.location"
> in the flink-conf.yaml or in your local
> System.getProperty("java.io.tmpdir"). This file will contain the
> applicationId of the cluster and
> it will be picked up by any future calls to `flink run`. Could you
> check if this file exists and if it is updated every time you create a
> cluster?
>
> Thanks,
> Kostas
>
> On Thu, Dec 12, 2019 at 2:22 PM vino yang  wrote:
> >
> > Hi Pankaj,
> >
> > Can you tell us what's Flink version do you use?  And can you share the
> Flink client and job manager log with us?
> >
> > This information would help us to locate your problem.
> >
> > Best,
> > Vino
> >
> > Pankaj Chand  于2019年12月12日周四 下午7:08写道:
> >>
> >> Hello,
> >>
> >> When using Flink on YARN in session mode, each Flink job client would
> automatically know the YARN cluster to connect to. It says this somewhere
> in the documentation.
> >>
> >> So, I killed the Flink session cluster by simply killing the YARN
> application using the "yarn kill" command. However, when starting a new
> Flink session cluster and trying to submit Flink jobs to yarn-session,
> Flink complains that the old cluster (it gave the port number and YARN
> application ID) is not available.
> >>
> >> It seems like the details of the old cluster were still stored
> somewhere in Flink. So, I had to completely replace the Flink folder with a
> new one.
> >>
> >> Does anyone know the proper way to kill a Flink+YARN session cluster to
> completely remove it so that jobs will get submitted to a new Flink session
> cluster?
> >>
> >> Thanks,
> >>
> >> Pankaj
>


Flink client trying to submit jobs to old session cluster (which was killed)

2019-12-12 Thread Pankaj Chand
Hello,

When using Flink on YARN in session mode, each Flink job client would
automatically know the YARN cluster to connect to. It says this somewhere
in the documentation.

So, I killed the Flink session cluster by simply killing the YARN
application using the "yarn kill" command. However, when starting a new
Flink session cluster and trying to submit Flink jobs to yarn-session,
Flink complains that the old cluster (it gave the port number and YARN
application ID) is not available.

It seems like the details of the old cluster were still stored somewhere in
Flink. So, I had to completely replace the Flink folder with a new one.

Does anyone know the proper way to kill a Flink+YARN session cluster to
completely remove it so that jobs will get submitted to a new Flink session
cluster?

Thanks,

Pankaj


Re: Sample Code for querying Flink's default metrics

2019-12-12 Thread Pankaj Chand
Thank you, Chesnay!

On Thu, Dec 12, 2019 at 5:46 AM Chesnay Schepler  wrote:

> Yes, when a cluster was started it takes a few seconds for (any) metrics
> to be available.
>
> On 12/12/2019 11:36, Pankaj Chand wrote:
>
> Hi Vino,
>
> Thank you for the links regarding backpressure!
>
> I am currently using code to get metrics by calling REST API via curl.
> However, many times the REST API via curl gives an empty JSON object/array.
> Piped through JQ (for filtering JSON) it produces a null value. This is
> breaking my code.
> Example in a Yarn cluster session mode, the following metric
> "metrics?get=Status.JVM.CPU.Load" randomly (I think) returns an empty json
> object/array or an actual value.
>
> Is it possible that for CPU Load, the empty JSON object is returned when
> the job is newly started less than 10 seconds ago.
>
> Thanks,
>
> Pankaj
>
>
>
> On Mon, Dec 9, 2019 at 4:21 AM vino yang  wrote:
>
>> Hi Pankaj,
>>
>> > Is there any sample code for how to read such default metrics?  Is
>> there any way to query the default metrics, such as CPU usage and Memory,
>> without using REST API or Reporters?
>>
>> What's your real requirement? Can you use code to call REST API?  Why
>> does it not match your requirements?
>>
>> > Additionally, how do I query Backpressure using code, or is it still
>> only visually available via the dashboard UI? Consequently, is there any
>> way to infer Backpressure by querying one (or more) of the Memory metrics
>> of the TaskManager?
>>
>> The backpressure is related to not only memory metrics but also IO and
>> network metrics, for more details about measure backpressure please see
>> this blog.[1][2]
>>
>> [1]: https://flink.apache.org/2019/06/05/flink-network-stack.html
>> [2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html
>>
>> Best,
>> Vino
>>
>> Pankaj Chand  于2019年12月9日周一 下午12:07写道:
>>
>>> Hello,
>>>
>>> Using Flink on Yarn, I could not understand the documentation for how to
>>> read the default metrics via code. In particular, I want to read
>>> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
>>> Memory.
>>>
>>> Is there any sample code for how to read such default metrics?  Is there
>>> any way to query the default metrics, such as CPU usage and Memory, without
>>> using REST API or Reporters?
>>>
>>> Additionally, how do I query Backpressure using code, or is it still
>>> only visually available via the dashboard UI? Consequently, is there any
>>> way to infer Backpressure by querying one (or more) of the Memory metrics
>>> of the TaskManager?
>>>
>>> Thank you,
>>>
>>> Pankaj
>>>
>>
>


Re: Sample Code for querying Flink's default metrics

2019-12-12 Thread Pankaj Chand
Hi Vino,

Thank you for the links regarding backpressure!

I am currently using code to get metrics by calling REST API via curl.
However, many times the REST API via curl gives an empty JSON object/array.
Piped through JQ (for filtering JSON) it produces a null value. This is
breaking my code.
Example in a Yarn cluster session mode, the following metric
"metrics?get=Status.JVM.CPU.Load" randomly (I think) returns an empty json
object/array or an actual value.

Is it possible that for CPU Load, the empty JSON object is returned when
the job is newly started less than 10 seconds ago.

Thanks,

Pankaj



On Mon, Dec 9, 2019 at 4:21 AM vino yang  wrote:

> Hi Pankaj,
>
> > Is there any sample code for how to read such default metrics?  Is there
> any way to query the default metrics, such as CPU usage and Memory, without
> using REST API or Reporters?
>
> What's your real requirement? Can you use code to call REST API?  Why does
> it not match your requirements?
>
> > Additionally, how do I query Backpressure using code, or is it still
> only visually available via the dashboard UI? Consequently, is there any
> way to infer Backpressure by querying one (or more) of the Memory metrics
> of the TaskManager?
>
> The backpressure is related to not only memory metrics but also IO and
> network metrics, for more details about measure backpressure please see
> this blog.[1][2]
>
> [1]: https://flink.apache.org/2019/06/05/flink-network-stack.html
> [2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html
>
> Best,
> Vino
>
> Pankaj Chand  于2019年12月9日周一 下午12:07写道:
>
>> Hello,
>>
>> Using Flink on Yarn, I could not understand the documentation for how to
>> read the default metrics via code. In particular, I want to read
>> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
>> Memory.
>>
>> Is there any sample code for how to read such default metrics?  Is there
>> any way to query the default metrics, such as CPU usage and Memory, without
>> using REST API or Reporters?
>>
>> Additionally, how do I query Backpressure using code, or is it still only
>> visually available via the dashboard UI? Consequently, is there any way to
>> infer Backpressure by querying one (or more) of the Memory metrics of the
>> TaskManager?
>>
>> Thank you,
>>
>> Pankaj
>>
>


Sample Code for querying Flink's default metrics

2019-12-08 Thread Pankaj Chand
Hello,

Using Flink on Yarn, I could not understand the documentation for how to
read the default metrics via code. In particular, I want to read
throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
Memory.

Is there any sample code for how to read such default metrics?  Is there
any way to query the default metrics, such as CPU usage and Memory, without
using REST API or Reporters?

Additionally, how do I query Backpressure using code, or is it still only
visually available via the dashboard UI? Consequently, is there any way to
infer Backpressure by querying one (or more) of the Memory metrics of the
TaskManager?

Thank you,

Pankaj


Re: How to install Flink + YARN?

2019-12-07 Thread Pankaj Chand
Please disregard my last question. It is working fine with Hadoop 2.7.5.

Thanks

On Sat, Dec 7, 2019 at 2:13 AM Pankaj Chand 
wrote:

> Is it required to use exactly the same versions of Hadoop as the
> pre-bundled hadoop version?
>
> I'm using Hadoop 2.7.1 cluster with Flink 1.9.1 and the corresponding
> Prebundled Hadoop 2.7.5.
>
> When I submit a job using:
>
> [vagrant@node1 flink]$ ./bin/flink run -m yarn-cluster
> ./examples/streaming/SocketWindowWordCount.jar --port 9001
>
> the tail of the log is empty , and i get the following messages:
>
> vagrant@node1 flink]$ ./bin/flink run -m yarn-cluster
> ./examples/streaming/SocketWindowWordCount.jar --port 9001
> 2019-12-07 07:04:15,394 INFO  org.apache.hadoop.yarn.client.RMProxy
>   - Connecting to ResourceManager at /0.0.0.0:8032
> 2019-12-07 07:04:15,493 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
> for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-07 07:04:15,493 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
> for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-12-07 07:04:16,615 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 0 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:17,617 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 1 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:18,619 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 2 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:19,621 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 3 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:20,629 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 4 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:21,632 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 5 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:22,634 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 6 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:23,639 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 7 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:24,644 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 8 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:25,651 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 9 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:56,677 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 0 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:57,679 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 1 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:58,680 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 2 time(s); retry policy is
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2019-12-07 07:04:59,686 INFO  org.apache.hadoop.ipc.Client
>  - Retrying 

Re: How to install Flink + YARN?

2019-12-06 Thread Pankaj Chand
 is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2019-12-07 07:05:02,693 INFO  org.apache.hadoop.ipc.Client
 - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
Already tried 6 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2019-12-07 07:05:03,695 INFO  org.apache.hadoop.ipc.Client
 - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
Already tried 7 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2019-12-07 07:05:04,704 INFO  org.apache.hadoop.ipc.Client
 - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
Already tried 8 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2019-12-07 07:05:05,715 INFO  org.apache.hadoop.ipc.Client
 - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
Already tried 9 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)

Thanks!

Pankaj

On Wed, Nov 20, 2019 at 2:45 AM Pankaj Chand 
wrote:

> Thank you, Ana and Yang!
>
> On Tue, Nov 19, 2019, 9:29 PM Yang Wang  wrote:
>
>> Hi Pankaj,
>>
>> First, you need to prepare a hadoop environment separately, including
>> hdfs and Yarn. If you are familiar
>> with hadoop, you could download the binary[1] and start the cluster on
>> you nodes manually. Otherwise,
>> some tools may help you to deploy a hadoop cluster, ambari[2] and
>> cloudera manager[2].
>>
>> Then, download the Flink with Hadoop pre-bundle. You can submit your
>> flink job now.
>>
>> Please make sure you set the correct HADOOP_CONF_DIR in your flink client
>> before starting a flink cluster.
>>
>>
>>
>> [1].https://hadoop.apache.org/releases.html
>> [2].
>> https://www.cloudera.com/products/open-source/apache-hadoop/apache-ambari.html
>> [3].
>> https://www.cloudera.com/products/product-components/cloudera-manager.html
>>
>> Ana  于2019年11月20日周三 上午10:12写道:
>>
>>> Hi,
>>>
>>> I was able to run Flink on YARN by installing YARN and Flink separately.
>>>
>>> Thank you.
>>>
>>> Ana
>>>
>>> On Wed, Nov 20, 2019 at 10:42 AM Pankaj Chand 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I want to run Flink on YARN upon a cluster of nodes. From the
>>>> documentation, I was not able to fully understand how to go about it. Some
>>>> of the archived answers are a bit old and had pending JIRA issues, so I
>>>> thought I would ask.
>>>>
>>>> Am I first supposed to install YARN separately, and then download the
>>>> Flink file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put
>>>> into Flink's /lib folder provide the entire YARN installation?
>>>>
>>>> Is there any download that bundles a complete *installation of Fink +
>>>> installation of YARN*?
>>>>
>>>> Thank you,
>>>>
>>>> Pankaj
>>>>
>>>


Re: How to install Flink + YARN?

2019-11-19 Thread Pankaj Chand
Thank you, Ana and Yang!

On Tue, Nov 19, 2019, 9:29 PM Yang Wang  wrote:

> Hi Pankaj,
>
> First, you need to prepare a hadoop environment separately, including hdfs
> and Yarn. If you are familiar
> with hadoop, you could download the binary[1] and start the cluster on you
> nodes manually. Otherwise,
> some tools may help you to deploy a hadoop cluster, ambari[2] and cloudera
> manager[2].
>
> Then, download the Flink with Hadoop pre-bundle. You can submit your flink
> job now.
>
> Please make sure you set the correct HADOOP_CONF_DIR in your flink client
> before starting a flink cluster.
>
>
>
> [1].https://hadoop.apache.org/releases.html
> [2].
> https://www.cloudera.com/products/open-source/apache-hadoop/apache-ambari.html
> [3].
> https://www.cloudera.com/products/product-components/cloudera-manager.html
>
> Ana  于2019年11月20日周三 上午10:12写道:
>
>> Hi,
>>
>> I was able to run Flink on YARN by installing YARN and Flink separately.
>>
>> Thank you.
>>
>> Ana
>>
>> On Wed, Nov 20, 2019 at 10:42 AM Pankaj Chand 
>> wrote:
>>
>>> Hello,
>>>
>>> I want to run Flink on YARN upon a cluster of nodes. From the
>>> documentation, I was not able to fully understand how to go about it. Some
>>> of the archived answers are a bit old and had pending JIRA issues, so I
>>> thought I would ask.
>>>
>>> Am I first supposed to install YARN separately, and then download the
>>> Flink file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put
>>> into Flink's /lib folder provide the entire YARN installation?
>>>
>>> Is there any download that bundles a complete *installation of Fink +
>>> installation of YARN*?
>>>
>>> Thank you,
>>>
>>> Pankaj
>>>
>>


How to install Flink + YARN?

2019-11-19 Thread Pankaj Chand
Hello,

I want to run Flink on YARN upon a cluster of nodes. From the
documentation, I was not able to fully understand how to go about it. Some
of the archived answers are a bit old and had pending JIRA issues, so I
thought I would ask.

Am I first supposed to install YARN separately, and then download the Flink
file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put into
Flink's /lib folder provide the entire YARN installation?

Is there any download that bundles a complete *installation of Fink +
installation of YARN*?

Thank you,

Pankaj


Re: Cannot modify parallelism (rescale job) more than once

2019-10-29 Thread Pankaj Chand
Thank you!

On Mon, Oct 28, 2019 at 3:53 AM vino yang  wrote:

> Hi Pankaj,
>
> It seems it is a bug. You can report it by opening a Jira issue.
>
> Best,
> Vino
>
> Pankaj Chand  于2019年10月28日周一 上午10:51写道:
>
>> Hello,
>>
>> I am trying to modify the parallelism of a streaming Flink job
>> (wiki-edits example) multiple times on a standalone cluster (one local
>> machine) having two TaskManagers with 3 slots each (i.e. 6 slots total).
>> However, the "modify" command is only working once (e.g. when I change the
>> parallelism from 2 to 4). The second time (e.g. change parallelism to 6 or
>> even back to 2), it is giving an error.
>>
>> I am using Flink 1.8.1 (since I found that the modify parallelism command
>> has been removed from v1.9 documentation) and have configured savepoints to
>> be written to file:///home/pankaj/flink-checkpoints. The output of the
>> first "modify  -p 4" command and second "modify  -p 6"
>> command is copied below.
>>
>> Please tell me how to modify parallelism multiple times at runtime.
>>
>> Thanks,
>>
>> Pankaj
>>
>>
>> $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4
>> Modify job 94831ca34951975dbee3335a384ee935.
>> Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4.
>>
>> $ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6
>> Modify job 94831ca34951975dbee3335a384ee935.
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.util.FlinkException: Could not rescale job
>> 94831ca34951975dbee3335a384ee935.
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
>> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.IllegalStateException: Suspend needs to happen atomically
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961)
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor.aroundReceive(Actor.scala:502)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:500)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>> Caused by: java.lang.IllegalStateException: Suspend needs to happen
>> atomically
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>> ... 20 more
>>
>


Cannot modify parallelism (rescale job) more than once

2019-10-27 Thread Pankaj Chand
Hello,

I am trying to modify the parallelism of a streaming Flink job (wiki-edits
example) multiple times on a standalone cluster (one local machine) having
two TaskManagers with 3 slots each (i.e. 6 slots total). However, the
"modify" command is only working once (e.g. when I change the parallelism
from 2 to 4). The second time (e.g. change parallelism to 6 or even back to
2), it is giving an error.

I am using Flink 1.8.1 (since I found that the modify parallelism command
has been removed from v1.9 documentation) and have configured savepoints to
be written to file:///home/pankaj/flink-checkpoints. The output of the
first "modify  -p 4" command and second "modify  -p 6"
command is copied below.

Please tell me how to modify parallelism multiple times at runtime.

Thanks,

Pankaj


$ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4
Modify job 94831ca34951975dbee3335a384ee935.
Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4.

$ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6
Modify job 94831ca34951975dbee3335a384ee935.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not rescale job
94831ca34951975dbee3335a384ee935.
at
org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: Suspend needs to happen atomically
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.IllegalStateException: Suspend needs to happen
atomically
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
at
org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 20 more


Re: Limit number of jobs or Job Managers

2019-06-27 Thread Pankaj Chand
Hi Haibo Sun,

It's exactly what I needed. Thank you so much!

Best,

Pankaj

On Thu, Jun 27, 2019 at 7:45 AM Haibo Sun  wrote:

> Hi,  Pankaj Chand
>
> If you're running Flink on YARN, you can do this by limiting the number of
> applications in the cluster or in the queue. As far as I know, Flink does
> not limit that.
>
> The following are the configuration items for  YARN :
> yarn.scheduler.capacity.maximum-applications
> yarn.scheduler.capacity..maximum-applications
>
> Best,
> Haibo
>
> At 2019-06-27 20:55:48, "Pankaj Chand"  wrote:
>
> Hi everyone,
>
> Is there any way (parameter or function) I can limit the number of
> concurrent jobs executing in my Flink cluster? Or alternatively, limit the
> number of concurrent Job Managers (since there has to be one Job Manager
> for every job)?
>
> Thanks!
>
> Pankaj
>
>


Limit number of jobs or Job Managers

2019-06-27 Thread Pankaj Chand
Hi everyone,

Is there any way (parameter or function) I can limit the number of
concurrent jobs executing in my Flink cluster? Or alternatively, limit the
number of concurrent Job Managers (since there has to be one Job Manager
for every job)?

Thanks!

Pankaj


Re: Updated Flink Documentation and tutorials

2019-06-20 Thread Pankaj Chand
Thank you!

On Thu, Jun 20, 2019, 5:49 AM Chesnay Schepler  wrote:

> There is no version of the documentation that is more up-to-date. The
> documentation was simply not updated yet for the new architecture.
>
> On 20/06/2019 11:45, Pankaj Chand wrote:
>
> Based on the below conversation (reverse chronological order) regarding my
> previous question on the role of Job Manager in Flink:
>
>
> Hi Biao,
>
> Thank you for your reply!
>
> Please let me know the url of the updated Flink documentation.
>
> The url of the outdated document is:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html
>
>
> Another page which (tacitly) supports the outdated concept is:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
>
>
> The website that hosts these pages is also the first result that comes up
> when you Google Search for "Flink documentation", and it claims it is a
> stable version. The url is:
> https://ci.apache.org/projects/flink/flink-docs-stable/
>
> Again, please let me know the url of the updated Flink documentation.
>
> Thank you Biao and Eduardo!
>
> Pankaj
> Hide quoted text
>
> On Tue, Jun 18, 2019 at 11:49 PM Biao Liu  wrote:
>
> Hi Pankaj,
>
> That's really a good question. There was a refactor of architecture
> before[1]. So there might be some descriptions used the outdated concept.
>
> Before refactoring, Job Manager is a centralized role. It controls whole
> cluster and all jobs which is described in your interpretation 1.
>
> After refactoring, the old Job Manager is separated into several roles,
> Resource Manager, Dispatcher, new Job Manager, etc. The new Job Manager is
> responsible for only one job, which is described in your interpretation 2.
>
> So the document you refer to is outdated. Would you mind telling us the
> URL of this document? I think we should update it to avoid misleading more
> people.
>
> 1.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>
> Eduardo Winpenny Tejedor  于2019年6月19日周三
> 上午1:12写道:
>
> Hi Pankaj,
>
> I have no experience with Hadoop but from the book I gathered there's one
> Job Manager per application i.e. per jar (as in the example in the first
> chapter). This is not to say there's one Job Manager per job. Actually I
> don't think the word Job is defined in the book, I've seen Task defined,
> and those do have Task Managers
>
> Hope this is along the right lines
>
> Regards,
> Eduardo
>
> On Tue, 18 Jun 2019, 08:42 Pankaj Chand, 
> wrote:
>
> I am trying to understand the role of Job Manager in Flink, and have come
> across two possibly distinct interpretations.
>
> 1. The online documentation v1.8 signifies that there is at least one Job
> Manager in a cluster, and it is closely tied to the cluster of machines, by
> managing all jobs in that cluster of machines.
>
> This signifies that Flink's Job Manager is much like Hadoop's Application
> Manager.
>
> 2. The book, "Stream Processing with Apache Flink", writes that, "The Job
> Manager is the master process that controls the execution of a single
> application—each application is controlled by a different Job Manager."
>
> This signifies that Flink defaults to one Job Manager per job, and the Job
> Manager is closely tied to that single job, much like Hadoop's Application
> Master for each job.
>
> Please let me know which one is correct.
>
> Pankaj
>
>
> On Thu, Jun 20, 2019, 4:54 AM Chesnay Schepler  wrote:
>
>> What makes you believe that they are out-dated?
>>
>> On 19/06/2019 19:17, Pankaj Chand wrote:
>> > Hello,
>> >
>> > Please let me know how to get the updated documentation and tutorials
>> > of Apache Flink.
>> > The stable v1.8 and v1.9-snapshot release of the documentation seems
>> > to be outdated.
>> >
>> > Thanks!
>> >
>> > Pankaj
>>
>>
>>
>>
>


Re: Updated Flink Documentation and tutorials

2019-06-20 Thread Pankaj Chand
Based on the below conversation (reverse chronological order) regarding my
previous question on the role of Job Manager in Flink:


Hi Biao,

Thank you for your reply!

Please let me know the url of the updated Flink documentation.

The url of the outdated document is:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html


Another page which (tacitly) supports the outdated concept is:
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html


The website that hosts these pages is also the first result that comes up
when you Google Search for "Flink documentation", and it claims it is a
stable version. The url is:
https://ci.apache.org/projects/flink/flink-docs-stable/

Again, please let me know the url of the updated Flink documentation.

Thank you Biao and Eduardo!

Pankaj
Hide quoted text

On Tue, Jun 18, 2019 at 11:49 PM Biao Liu  wrote:

Hi Pankaj,

That's really a good question. There was a refactor of architecture
before[1]. So there might be some descriptions used the outdated concept.

Before refactoring, Job Manager is a centralized role. It controls whole
cluster and all jobs which is described in your interpretation 1.

After refactoring, the old Job Manager is separated into several roles,
Resource Manager, Dispatcher, new Job Manager, etc. The new Job Manager is
responsible for only one job, which is described in your interpretation 2.

So the document you refer to is outdated. Would you mind telling us the URL
of this document? I think we should update it to avoid misleading more
people.

1. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

Eduardo Winpenny Tejedor  于2019年6月19日周三
上午1:12写道:

Hi Pankaj,

I have no experience with Hadoop but from the book I gathered there's one
Job Manager per application i.e. per jar (as in the example in the first
chapter). This is not to say there's one Job Manager per job. Actually I
don't think the word Job is defined in the book, I've seen Task defined,
and those do have Task Managers

Hope this is along the right lines

Regards,
Eduardo

On Tue, 18 Jun 2019, 08:42 Pankaj Chand,  wrote:

I am trying to understand the role of Job Manager in Flink, and have come
across two possibly distinct interpretations.

1. The online documentation v1.8 signifies that there is at least one Job
Manager in a cluster, and it is closely tied to the cluster of machines, by
managing all jobs in that cluster of machines.

This signifies that Flink's Job Manager is much like Hadoop's Application
Manager.

2. The book, "Stream Processing with Apache Flink", writes that, "The Job
Manager is the master process that controls the execution of a single
application—each application is controlled by a different Job Manager."

This signifies that Flink defaults to one Job Manager per job, and the Job
Manager is closely tied to that single job, much like Hadoop's Application
Master for each job.

Please let me know which one is correct.

Pankaj


On Thu, Jun 20, 2019, 4:54 AM Chesnay Schepler  wrote:

> What makes you believe that they are out-dated?
>
> On 19/06/2019 19:17, Pankaj Chand wrote:
> > Hello,
> >
> > Please let me know how to get the updated documentation and tutorials
> > of Apache Flink.
> > The stable v1.8 and v1.9-snapshot release of the documentation seems
> > to be outdated.
> >
> > Thanks!
> >
> > Pankaj
>
>
>
>


Updated Flink Documentation and tutorials

2019-06-19 Thread Pankaj Chand
Hello,

Please let me know how to get the updated documentation and tutorials of
Apache Flink.
The stable v1.8 and v1.9-snapshot release of the documentation seems to be
outdated.

Thanks!

Pankaj


Re: Role of Job Manager

2019-06-19 Thread Pankaj Chand
Hi Biao,

Thank you for your reply!

Please let me know the url of the updated Flink documentation.

The url of the outdated document is:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html


Another page which (tacitly) supports the outdated concept is:
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html


The website that hosts these pages is also the first result that comes up
when you Google Search for "Flink documentation", and it claims it is a
stable version. The url is:
https://ci.apache.org/projects/flink/flink-docs-stable/

Again, please let me know the url of the updated Flink documentation.

Thank you Biao and Eduardo!

Pankaj

On Tue, Jun 18, 2019 at 11:49 PM Biao Liu  wrote:

> Hi Pankaj,
>
> That's really a good question. There was a refactor of architecture
> before[1]. So there might be some descriptions used the outdated concept.
>
> Before refactoring, Job Manager is a centralized role. It controls whole
> cluster and all jobs which is described in your interpretation 1.
>
> After refactoring, the old Job Manager is separated into several roles,
> Resource Manager, Dispatcher, new Job Manager, etc. The new Job Manager is
> responsible for only one job, which is described in your interpretation 2.
>
> So the document you refer to is outdated. Would you mind telling us the
> URL of this document? I think we should update it to avoid misleading more
> people.
>
> 1.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>
> Eduardo Winpenny Tejedor  于2019年6月19日周三
> 上午1:12写道:
>
>> Hi Pankaj,
>>
>> I have no experience with Hadoop but from the book I gathered there's one
>> Job Manager per application i.e. per jar (as in the example in the first
>> chapter). This is not to say there's one Job Manager per job. Actually I
>> don't think the word Job is defined in the book, I've seen Task defined,
>> and those do have Task Managers
>>
>> Hope this is along the right lines
>>
>> Regards,
>> Eduardo
>>
>> On Tue, 18 Jun 2019, 08:42 Pankaj Chand, 
>> wrote:
>>
>>> I am trying to understand the role of Job Manager in Flink, and have
>>> come across two possibly distinct interpretations.
>>>
>>> 1. The online documentation v1.8 signifies that there is at least one
>>> Job Manager in a cluster, and it is closely tied to the cluster of
>>> machines, by managing all jobs in that cluster of machines.
>>>
>>> This signifies that Flink's Job Manager is much like Hadoop's
>>> Application Manager.
>>>
>>> 2. The book, "Stream Processing with Apache Flink", writes that, "The
>>> Job Manager is the master process that controls the execution of a single
>>> application—each application is controlled by a different Job Manager."
>>>
>>> This signifies that Flink defaults to one Job Manager per job, and the
>>> Job Manager is closely tied to that single job, much like Hadoop's
>>> Application Master for each job.
>>>
>>> Please let me know which one is correct.
>>>
>>> Pankaj
>>>
>>


Role of Job Manager

2019-06-18 Thread Pankaj Chand
I am trying to understand the role of Job Manager in Flink, and have come
across two possibly distinct interpretations.

1. The online documentation v1.8 signifies that there is at least one Job
Manager in a cluster, and it is closely tied to the cluster of machines, by
managing all jobs in that cluster of machines.

This signifies that Flink's Job Manager is much like Hadoop's Application
Manager.

2. The book, "Stream Processing with Apache Flink", writes that, "The Job
Manager is the master process that controls the execution of a single
application—each application is controlled by a different Job Manager."

This signifies that Flink defaults to one Job Manager per job, and the Job
Manager is closely tied to that single job, much like Hadoop's Application
Master for each job.

Please let me know which one is correct.

Pankaj