Re: Flink Task Allocation on Nodes
Flink Cluster in standalone with HA configuration. It has 6 Task managers and each has 8 slots. Overall, 48 slots for the cluster. >>If you cluster only have one task manager with one slot in each node, then the job should be spread evenly. Agree, this will solve the issue. However, the cluster is running other jobs and in this case it won't have hardware resource for other jobs. On Wed, Oct 24, 2018 at 2:20 PM Kien Truong wrote: > Hi, > > How are your task managers deploy ? > > If you cluster only have one task manager with one slot in each node, > then the job should be spread evenly. > > Regards, > > Kien > > On 10/24/2018 4:35 PM, Sayat Satybaldiyev wrote: > > Is there any way to indicate flink not to allocate all parallel tasks > > on one node? We have a stateless flink job that reading from 10 > > partition topic and have a parallelism of 6. Flink job manager > > allocates all 6 parallel operators to one machine, causing all traffic > > from Kafka allocated to only one machine. We have a cluster of 6 nodes > > and ideal to spread one parallel operator to one machine. Is there a > > way to do than in Flink? >
Flink Task Allocation on Nodes
Is there any way to indicate flink not to allocate all parallel tasks on one node? We have a stateless flink job that reading from 10 partition topic and have a parallelism of 6. Flink job manager allocates all 6 parallel operators to one machine, causing all traffic from Kafka allocated to only one machine. We have a cluster of 6 nodes and ideal to spread one parallel operator to one machine. Is there a way to do than in Flink?
Re: Flink leaves a lot RocksDB sst files in tmp directory
Thank you Piotr for the reply! We didn't run this job on the previous version of Flink. Unfortunately, I don't have a log file from JM only TM logs. https://drive.google.com/file/d/14QSVeS4c0EETT6ibK3m_TMgdLUwD6H1m/view?usp=sharing On Wed, Oct 10, 2018 at 10:08 AM Piotr Nowojski wrote: > Hi, > > Was this happening in older Flink version? Could you post in what > circumstances the job has been moved to a new TM (full job manager logs and > task manager logs would be helpful)? I’m suspecting that those leftover > files might have something to do with local recovery. > > Piotrek > > On 9 Oct 2018, at 15:28, Sayat Satybaldiyev wrote: > > After digging more in the log, I think it's more a bug. I've greped a log > by job id and found under normal circumstances TM supposed to delete > flink-io files. For some reason, it doesn't delete files that were listed > above. > > 2018-10-08 22:10:25,865 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Deleting existing instance base directory > /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_bf69685b-78d3-431c-88be-b3f26db05566. > 2018-10-08 22:10:25,867 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Deleting existing instance base directory > /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_14630a50145935222dbee3f1bcfdc2a6__1_1__uuid_47cd6e95-144a-4c52-a905-52966a5e9381. > 2018-10-08 22:10:25,874 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Deleting existing instance base directory > /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_7c539a96-a247-4299-b1a0-01df713c3c34. > 2018-10-08 22:17:38,680 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor- Close > JobManager connection for job a5b223c7aee89845f9aed24012e46b7e. > org.apache.flink.util.FlinkException: JobManager responsible for > a5b223c7aee89845f9aed24012e46b7e lost the leadership. > org.apache.flink.util.FlinkException: JobManager responsible for > a5b223c7aee89845f9aed24012e46b7e lost the leadership. > 2018-10-08 22:17:38,686 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Deleting existing instance base directory > /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_2e88c56a-2fc2-41f2-a1b9-3b0594f660fb. > org.apache.flink.util.FlinkException: JobManager responsible for > a5b223c7aee89845f9aed24012e46b7e lost the leadership. > 2018-10-08 22:17:38,691 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Deleting existing instance base directory > /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_b44aecb7-ba16-4aa4-b709-31dae7f58de9. > org.apache.flink.util.FlinkException: JobManager responsible for > a5b223c7aee89845f9aed24012e46b7e lost the leadership. > org.apache.flink.util.FlinkException: JobManager responsible for > a5b223c7aee89845f9aed24012e46b7e lost the leadership. > org.apache.flink.util.FlinkException: JobManager responsible for > a5b223c7aee89845f9aed24012e46b7e lost the leadership. > > > On Tue, Oct 9, 2018 at 2:33 PM Sayat Satybaldiyev > wrote: > >> Dear all, >> >> While running Flink 1.6.1 with RocksDB as a backend and hdfs as >> checkpoint FS, I've noticed that after a job has moved to a different host >> it leaves quite a huge state in temp folder(1.2TB in total). The files are >> not used as TM is not running a job on the current host. >> >> The job a5b223c7aee89845f9aed24012e46b7e had been running on the host but >> then it was moved to a different TM. I'm wondering is it intended >> behavior or a possible bug? >> >> I've attached files that are left and not used by a job in PrintScreen. >> > >
Re: Flink leaves a lot RocksDB sst files in tmp directory
After digging more in the log, I think it's more a bug. I've greped a log by job id and found under normal circumstances TM supposed to delete flink-io files. For some reason, it doesn't delete files that were listed above. 2018-10-08 22:10:25,865 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Deleting existing instance base directory /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_bf69685b-78d3-431c-88be-b3f26db05566. 2018-10-08 22:10:25,867 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Deleting existing instance base directory /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_14630a50145935222dbee3f1bcfdc2a6__1_1__uuid_47cd6e95-144a-4c52-a905-52966a5e9381. 2018-10-08 22:10:25,874 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Deleting existing instance base directory /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_7c539a96-a247-4299-b1a0-01df713c3c34. 2018-10-08 22:17:38,680 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor- Close JobManager connection for job a5b223c7aee89845f9aed24012e46b7e. org.apache.flink.util.FlinkException: JobManager responsible for a5b223c7aee89845f9aed24012e46b7e lost the leadership. org.apache.flink.util.FlinkException: JobManager responsible for a5b223c7aee89845f9aed24012e46b7e lost the leadership. 2018-10-08 22:17:38,686 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Deleting existing instance base directory /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_2e88c56a-2fc2-41f2-a1b9-3b0594f660fb. org.apache.flink.util.FlinkException: JobManager responsible for a5b223c7aee89845f9aed24012e46b7e lost the leadership. 2018-10-08 22:17:38,691 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Deleting existing instance base directory /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_b44aecb7-ba16-4aa4-b709-31dae7f58de9. org.apache.flink.util.FlinkException: JobManager responsible for a5b223c7aee89845f9aed24012e46b7e lost the leadership. org.apache.flink.util.FlinkException: JobManager responsible for a5b223c7aee89845f9aed24012e46b7e lost the leadership. org.apache.flink.util.FlinkException: JobManager responsible for a5b223c7aee89845f9aed24012e46b7e lost the leadership. On Tue, Oct 9, 2018 at 2:33 PM Sayat Satybaldiyev wrote: > Dear all, > > While running Flink 1.6.1 with RocksDB as a backend and hdfs as > checkpoint FS, I've noticed that after a job has moved to a different host > it leaves quite a huge state in temp folder(1.2TB in total). The files are > not used as TM is not running a job on the current host. > > The job a5b223c7aee89845f9aed24012e46b7e had been running on the host but > then it was moved to a different TM. I'm wondering is it intended > behavior or a possible bug? > > I've attached files that are left and not used by a job in PrintScreen. >
Re: Rocksdb Metrics
actually, once I wrote my question I've realized that I can do it with custom metrics and getting easily the size of the state map. On Wed, Sep 26, 2018 at 11:57 AM Sayat Satybaldiyev wrote: > Thank you for this information. @Yun is there an easy way to expose a > number of records in rockdsdb? > > On Wed, Sep 26, 2018 at 9:47 AM Yun Tang wrote: > >> Hi Sayat >> >> Before this future is on, you could also find some metrics information, >> such as hit/miss count, file status from RocksDB itself. By default, >> RocksDB will dump its stats to its information LOG file every 10 minutes >> (you could call DBOptions.setStatsDumpPeriodSec to reduce the time >> interval), and you could find the information LOG file under rocksDB >> state-backend's db folder. >> >> Best >> Yun >> -- >> *From:* Stefan Richter >> *Sent:* Wednesday, September 26, 2018 0:56 >> *To:* Sayat Satybaldiyev >> *Cc:* user@flink.apache.org >> *Subject:* Re: Rocksdb Metrics >> >> Hi, >> >> this feature is tracked here >> https://issues.apache.org/jira/browse/FLINK-10423 >> >> Best, >> Stefan >> >> Am 25.09.2018 um 17:51 schrieb Sayat Satybaldiyev : >> >> Flink provides a rich number of metrics. However, I didn't find any >> metrics for rocksdb state backend not in metrics doc nor in JMX Mbean. >> >> Is there are any metrics for the rocksdb backend that Flink exposes? >> >> >>
Re: Rocksdb Metrics
Thank you for this information. @Yun is there an easy way to expose a number of records in rockdsdb? On Wed, Sep 26, 2018 at 9:47 AM Yun Tang wrote: > Hi Sayat > > Before this future is on, you could also find some metrics information, > such as hit/miss count, file status from RocksDB itself. By default, > RocksDB will dump its stats to its information LOG file every 10 minutes > (you could call DBOptions.setStatsDumpPeriodSec to reduce the time > interval), and you could find the information LOG file under rocksDB > state-backend's db folder. > > Best > Yun > -- > *From:* Stefan Richter > *Sent:* Wednesday, September 26, 2018 0:56 > *To:* Sayat Satybaldiyev > *Cc:* user@flink.apache.org > *Subject:* Re: Rocksdb Metrics > > Hi, > > this feature is tracked here > https://issues.apache.org/jira/browse/FLINK-10423 > > Best, > Stefan > > Am 25.09.2018 um 17:51 schrieb Sayat Satybaldiyev : > > Flink provides a rich number of metrics. However, I didn't find any > metrics for rocksdb state backend not in metrics doc nor in JMX Mbean. > > Is there are any metrics for the rocksdb backend that Flink exposes? > > >
Rocksdb Metrics
Flink provides a rich number of metrics. However, I didn't find any metrics for rocksdb state backend not in metrics doc nor in JMX Mbean. Is there are any metrics for the rocksdb backend that Flink exposes?
Re: JMX Configuration: Missing Job Related Beans
yep, they're there. thank you! On Mon, Sep 24, 2018 at 12:54 PM 杨力 wrote: > They are provided in taskmanagers. > > Sayat Satybaldiyev 于 2018年9月24日周一 下午6:38写道: > >> Dear all, >> >> While configuring JMX with Flink, I don't see some bean metrics that >> belongs to the job, in particular, the number in/out records per operator. >> I've checked REST API and those numbers provided there. Does flink provide >> such bean or there's an additional configuration for it? >> >> Here's a list of bean that I see in visual vm: >> jobmanager.Status.JVM.* >> jobmanager.job.downtime >> jobmanager.job.lastCheckpoint* >> jobmanager.job.RestaringTime >> jobmanager.job.uptime >> >> and a bunch of JVM related one. I've attached a print screen from >> VisualVM to the email. >> >> Configuration for JMX in flink/conf.yaml: >> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter >> metrics.reporter.jmx.port: 9020-9040 >> >
JMX Configuration: Missing Job Related Beans
Dear all, While configuring JMX with Flink, I don't see some bean metrics that belongs to the job, in particular, the number in/out records per operator. I've checked REST API and those numbers provided there. Does flink provide such bean or there's an additional configuration for it? Here's a list of bean that I see in visual vm: jobmanager.Status.JVM.* jobmanager.job.downtime jobmanager.job.lastCheckpoint* jobmanager.job.RestaringTime jobmanager.job.uptime and a bunch of JVM related one. I've attached a print screen from VisualVM to the email. Configuration for JMX in flink/conf.yaml: metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter metrics.reporter.jmx.port: 9020-9040
Stream to Stream Join Memory Management
Hello! I'm trying to do a simple DataStream to DataStream join. Have two kafka topics that has common field. I'm trying to join by via keyBy-join-where-equalTo-TumblingWindow API in Flink 1.4.1. My tumbling window size is 1 day. There will be more data than machine has memory. I know that Flink uses RocksDB to store state of the window. Will Flink use RocksDB to join between windows and not use HashMap for the merge operation? Best, Sayat