Re: Flink Task Allocation on Nodes

2018-10-24 Thread Sayat Satybaldiyev
Flink Cluster in standalone with HA configuration. It has 6 Task managers
and each has 8 slots. Overall, 48 slots for the cluster.

>>If you cluster only have one task manager with one slot in each node,
then the job should be spread evenly.
Agree, this will solve the issue. However, the cluster is running other
jobs and in this case it won't have hardware resource for other jobs.

On Wed, Oct 24, 2018 at 2:20 PM Kien Truong  wrote:

> Hi,
>
> How are your task managers deploy ?
>
> If you cluster only have one task manager with one slot in each node,
> then the job should be spread evenly.
>
> Regards,
>
> Kien
>
> On 10/24/2018 4:35 PM, Sayat Satybaldiyev wrote:
> > Is there any way to indicate flink not to allocate all parallel tasks
> > on one node?  We have a stateless flink job that reading from 10
> > partition topic and have a parallelism of 6. Flink job manager
> > allocates all 6 parallel operators to one machine, causing all traffic
> > from Kafka allocated to only one machine. We have a cluster of 6 nodes
> > and ideal to spread one parallel operator to one machine. Is there a
> > way to do than in Flink?
>


Flink Task Allocation on Nodes

2018-10-24 Thread Sayat Satybaldiyev
Is there any way to indicate flink not to allocate all parallel tasks on
one node?  We have a stateless flink job that reading from 10 partition
topic and have a parallelism of 6. Flink job manager allocates all 6
parallel operators to one machine, causing all traffic from Kafka allocated
to only one machine. We have a cluster of 6 nodes and ideal to spread one
parallel operator to one machine. Is there a way to do than in Flink?


Re: Flink leaves a lot RocksDB sst files in tmp directory

2018-10-11 Thread Sayat Satybaldiyev
Thank you Piotr for the reply! We didn't run this job on the previous
version of Flink. Unfortunately, I don't have a log file from JM only TM
logs.

https://drive.google.com/file/d/14QSVeS4c0EETT6ibK3m_TMgdLUwD6H1m/view?usp=sharing

On Wed, Oct 10, 2018 at 10:08 AM Piotr Nowojski 
wrote:

> Hi,
>
> Was this happening in older Flink version? Could you post in what
> circumstances the job has been moved to a new TM (full job manager logs and
> task manager logs would be helpful)? I’m suspecting that those leftover
> files might have something to do with local recovery.
>
> Piotrek
>
> On 9 Oct 2018, at 15:28, Sayat Satybaldiyev  wrote:
>
> After digging more in the log, I think it's more a bug. I've greped a log
> by job id and found under normal circumstances TM supposed to delete
> flink-io files. For some reason, it doesn't delete files that were listed
> above.
>
> 2018-10-08 22:10:25,865 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
> Deleting existing instance base directory
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_bf69685b-78d3-431c-88be-b3f26db05566.
> 2018-10-08 22:10:25,867 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
> Deleting existing instance base directory
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_14630a50145935222dbee3f1bcfdc2a6__1_1__uuid_47cd6e95-144a-4c52-a905-52966a5e9381.
> 2018-10-08 22:10:25,874 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
> Deleting existing instance base directory
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_7c539a96-a247-4299-b1a0-01df713c3c34.
> 2018-10-08 22:17:38,680 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
> JobManager connection for job a5b223c7aee89845f9aed24012e46b7e.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> 2018-10-08 22:17:38,686 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
> Deleting existing instance base directory
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_2e88c56a-2fc2-41f2-a1b9-3b0594f660fb.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> 2018-10-08 22:17:38,691 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
> Deleting existing instance base directory
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_b44aecb7-ba16-4aa4-b709-31dae7f58de9.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>
>
> On Tue, Oct 9, 2018 at 2:33 PM Sayat Satybaldiyev 
> wrote:
>
>> Dear all,
>>
>> While running Flink 1.6.1 with RocksDB as a backend and hdfs as
>> checkpoint FS, I've noticed that after a job has moved to a different host
>> it leaves quite a huge state in temp folder(1.2TB in total). The files are
>> not used as TM is not running a job on the current host.
>>
>> The job a5b223c7aee89845f9aed24012e46b7e had been running on the host but
>> then it was moved to a different TM. I'm wondering is it intended
>> behavior or a possible bug?
>>
>> I've attached files that are left and not used by a job in PrintScreen.
>>
>
>


Re: Flink leaves a lot RocksDB sst files in tmp directory

2018-10-09 Thread Sayat Satybaldiyev
After digging more in the log, I think it's more a bug. I've greped a log
by job id and found under normal circumstances TM supposed to delete
flink-io files. For some reason, it doesn't delete files that were listed
above.

2018-10-08 22:10:25,865 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
Deleting existing instance base directory
/tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_bf69685b-78d3-431c-88be-b3f26db05566.
2018-10-08 22:10:25,867 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
Deleting existing instance base directory
/tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_14630a50145935222dbee3f1bcfdc2a6__1_1__uuid_47cd6e95-144a-4c52-a905-52966a5e9381.
2018-10-08 22:10:25,874 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
Deleting existing instance base directory
/tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_7c539a96-a247-4299-b1a0-01df713c3c34.
2018-10-08 22:17:38,680 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
JobManager connection for job a5b223c7aee89845f9aed24012e46b7e.
org.apache.flink.util.FlinkException: JobManager responsible for
a5b223c7aee89845f9aed24012e46b7e lost the leadership.
org.apache.flink.util.FlinkException: JobManager responsible for
a5b223c7aee89845f9aed24012e46b7e lost the leadership.
2018-10-08 22:17:38,686 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
Deleting existing instance base directory
/tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_2e88c56a-2fc2-41f2-a1b9-3b0594f660fb.
org.apache.flink.util.FlinkException: JobManager responsible for
a5b223c7aee89845f9aed24012e46b7e lost the leadership.
2018-10-08 22:17:38,691 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
Deleting existing instance base directory
/tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_b44aecb7-ba16-4aa4-b709-31dae7f58de9.
org.apache.flink.util.FlinkException: JobManager responsible for
a5b223c7aee89845f9aed24012e46b7e lost the leadership.
org.apache.flink.util.FlinkException: JobManager responsible for
a5b223c7aee89845f9aed24012e46b7e lost the leadership.
org.apache.flink.util.FlinkException: JobManager responsible for
a5b223c7aee89845f9aed24012e46b7e lost the leadership.


On Tue, Oct 9, 2018 at 2:33 PM Sayat Satybaldiyev  wrote:

> Dear all,
>
> While running Flink 1.6.1 with RocksDB as a backend and hdfs as
> checkpoint FS, I've noticed that after a job has moved to a different host
> it leaves quite a huge state in temp folder(1.2TB in total). The files are
> not used as TM is not running a job on the current host.
>
> The job a5b223c7aee89845f9aed24012e46b7e had been running on the host but
> then it was moved to a different TM. I'm wondering is it intended
> behavior or a possible bug?
>
> I've attached files that are left and not used by a job in PrintScreen.
>


Re: Rocksdb Metrics

2018-09-26 Thread Sayat Satybaldiyev
actually, once I wrote my question I've realized that I can do it with
custom metrics and getting easily the size of the state map.

On Wed, Sep 26, 2018 at 11:57 AM Sayat Satybaldiyev 
wrote:

> Thank you for this information. @Yun is there an easy way to expose a
> number of records in rockdsdb?
>
> On Wed, Sep 26, 2018 at 9:47 AM Yun Tang  wrote:
>
>> Hi Sayat
>>
>> Before this future is on, you could also find some metrics information,
>> such as hit/miss count, file status from RocksDB itself. By default,
>> RocksDB will dump its stats to its information LOG file every 10 minutes
>> (you could call DBOptions.setStatsDumpPeriodSec to reduce the time
>> interval), and you could find the information LOG file under rocksDB
>> state-backend's db folder.
>>
>> Best
>> Yun
>> --
>> *From:* Stefan Richter 
>> *Sent:* Wednesday, September 26, 2018 0:56
>> *To:* Sayat Satybaldiyev
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Rocksdb Metrics
>>
>> Hi,
>>
>> this feature is tracked here
>> https://issues.apache.org/jira/browse/FLINK-10423
>>
>> Best,
>> Stefan
>>
>> Am 25.09.2018 um 17:51 schrieb Sayat Satybaldiyev :
>>
>> Flink provides a rich number of metrics. However, I didn't find any
>> metrics for rocksdb state backend not in metrics doc nor in JMX Mbean.
>>
>> Is there are any metrics for the rocksdb backend that Flink exposes?
>>
>>
>>


Re: Rocksdb Metrics

2018-09-26 Thread Sayat Satybaldiyev
Thank you for this information. @Yun is there an easy way to expose a
number of records in rockdsdb?

On Wed, Sep 26, 2018 at 9:47 AM Yun Tang  wrote:

> Hi Sayat
>
> Before this future is on, you could also find some metrics information,
> such as hit/miss count, file status from RocksDB itself. By default,
> RocksDB will dump its stats to its information LOG file every 10 minutes
> (you could call DBOptions.setStatsDumpPeriodSec to reduce the time
> interval), and you could find the information LOG file under rocksDB
> state-backend's db folder.
>
> Best
> Yun
> --
> *From:* Stefan Richter 
> *Sent:* Wednesday, September 26, 2018 0:56
> *To:* Sayat Satybaldiyev
> *Cc:* user@flink.apache.org
> *Subject:* Re: Rocksdb Metrics
>
> Hi,
>
> this feature is tracked here
> https://issues.apache.org/jira/browse/FLINK-10423
>
> Best,
> Stefan
>
> Am 25.09.2018 um 17:51 schrieb Sayat Satybaldiyev :
>
> Flink provides a rich number of metrics. However, I didn't find any
> metrics for rocksdb state backend not in metrics doc nor in JMX Mbean.
>
> Is there are any metrics for the rocksdb backend that Flink exposes?
>
>
>


Rocksdb Metrics

2018-09-25 Thread Sayat Satybaldiyev
Flink provides a rich number of metrics. However, I didn't find any metrics
for rocksdb state backend not in metrics doc nor in JMX Mbean.

Is there are any metrics for the rocksdb backend that Flink exposes?


Re: JMX Configuration: Missing Job Related Beans

2018-09-24 Thread Sayat Satybaldiyev
yep, they're there. thank you!

On Mon, Sep 24, 2018 at 12:54 PM 杨力  wrote:

> They are provided in taskmanagers.
>
> Sayat Satybaldiyev  于 2018年9月24日周一 下午6:38写道:
>
>> Dear all,
>>
>> While configuring JMX with Flink, I don't see some bean metrics that
>> belongs to the job, in particular, the number in/out records per operator.
>> I've checked REST API and those numbers provided there. Does flink provide
>> such bean or there's an additional configuration for it?
>>
>> Here's a list of bean that I see in visual vm:
>> jobmanager.Status.JVM.*
>> jobmanager.job.downtime
>> jobmanager.job.lastCheckpoint*
>> jobmanager.job.RestaringTime
>> jobmanager.job.uptime
>>
>> and a bunch of JVM related one. I've attached a print screen from
>> VisualVM to the email.
>>
>> Configuration for JMX in flink/conf.yaml:
>> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>> metrics.reporter.jmx.port: 9020-9040
>>
>


JMX Configuration: Missing Job Related Beans

2018-09-24 Thread Sayat Satybaldiyev
Dear all,

While configuring JMX with Flink, I don't see some bean metrics that
belongs to the job, in particular, the number in/out records per operator.
I've checked REST API and those numbers provided there. Does flink provide
such bean or there's an additional configuration for it?

Here's a list of bean that I see in visual vm:
jobmanager.Status.JVM.*
jobmanager.job.downtime
jobmanager.job.lastCheckpoint*
jobmanager.job.RestaringTime
jobmanager.job.uptime

and a bunch of JVM related one. I've attached a print screen from VisualVM
to the email.

Configuration for JMX in flink/conf.yaml:
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 9020-9040


Stream to Stream Join Memory Management

2018-05-18 Thread Sayat Satybaldiyev
Hello!

I'm trying to do a simple DataStream to DataStream join. Have two kafka
topics that has common field. I'm trying to join by via
keyBy-join-where-equalTo-TumblingWindow API in Flink 1.4.1.

My tumbling window size is 1 day. There will be more data than machine has
memory. I know that Flink uses RocksDB to store state of the window. Will
Flink use RocksDB to join between windows and not use HashMap for the merge
operation?

Best,
Sayat