Re: RocksDB checkpointing dir per TM

2018-10-26 Thread Elias Levy
There is also state.backend.rocksdb.localdir.  Oddly, I can find the
documentation for it in the 1.5 docs
,
but not in the 1.6 docs
.
The option is still in master
,
and it is used

.

On Fri, Oct 26, 2018 at 3:01 AM Andrey Zagrebin 
wrote:

> Hi Taher,
>
> TMs keep state locally while running, in this case RocksDB files already
> belong to TM.
> You can point it to the same NVME disk location on each node, relevant
> Flink options here are:
> - io.tmp.dirs
> - taskmanager.state.local.root-dirs
> This data is transient and has temporary nature. It does not survive a job
> failure.
>
> The checkpoint is a logical snapshot of the operator state for all
> involved TMs,
> so it belongs to the job and usually uploaded to a distributed file system
> available on all TMs.
> The location is set in Flink option ‘state.checkpoints.dir'.
> This way job can restore from it with different set of TMs.
>
> Best,
> Andrey
>
> > On 26 Oct 2018, at 08:29, Taher Koitawala 
> wrote:
> >
> > Hi All,
> >   Our current cluster configuration uses one HDD which is mainly
> for root and an other NVME disk per node, [1]we want make sure all TMs
> write their own RocksDB files to the NVME disk only, how do we do that?
> >
> > [2] Is it also possible to specify multiple directories per TMs so that
> we have an even spread when the RocksDB files are written?
> >
> > Thanks,
> > Taher Koitawala
>
>


Re: Unbalanced Kafka consumer consumption

2018-10-26 Thread Elias Levy
You can always shuffle the stream generated by the Kafka source
(dataStream.shuffle()) to evenly distribute records downstream.

On Fri, Oct 26, 2018 at 2:08 AM gerardg  wrote:

> Hi,
>
> We are experience issues scaling our Flink application and we have observed
> that it may be because Kafka messages consumption is not balanced across
> partitions. The attached image (lag per partition) shows how only one
> partition consumes messages (the blue one in the back) and it wasn't until
> it finished that the other ones started to consume at a good rate (actually
> the total throughput multiplied by 4 when these started) . Also, when that
> ones started to consume, one partition just stopped an accumulated messages
> back again until they finished.
>
> We don't see any resource (CPU, network, disk..) struggling in our cluster
> so we are not sure what could be causing this behavior. I can only assume
> that somehow Flink or the Kafka consumer is artificially slowing down the
> other partitions. Maybe due to how back pressure is handled?
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>
>
>
> Gerard
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Custom Trigger + SQL Pattern

2018-10-26 Thread shkob1
following up on the actual question - is there a way to register a
keyedstream as table(s) and have a trigger per key? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

2018-10-26 Thread Mikhail Pryakhin
Hi Till, thanks for your reply! here is the issue ticket:

https://issues.apache.org/jira/browse/FLINK-10694 


Kind Regards,
Mike Pryakhin

> On 26 Oct 2018, at 18:29, Till Rohrmann  wrote:
> 
> Hi Mike,
> 
> thanks for reporting this issue. I think you're right that Flink leaves some 
> empty nodes in ZooKeeper. It seems that we don't delete the  
> node with all its children in ZooKeeperHaServices#closeAndCleanupAllData.
> 
> Could you please open a JIRA issue to in order to fix it? Thanks a lot!
> 
> Cheers,
> Till
> 
> On Fri, Oct 26, 2018 at 4:31 PM Mikhail Pryakhin  > wrote:
> Hi Andrey, Thanks a lot for your reply!
> 
>> What was the full job life cycle? 
> 
> 1. The job is deployed as a YARN cluster with the following properties set
> 
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: 
>   high-availability.zookeeper.storageDir: hdfs:/// 
> <>
>   high-availability.zookeeper.path.root: 
>   high-availability.zookeeper.path.namespace: 
> 
> 2. The job is cancelled via flink cancel  command.
> 
>What I've noticed:
>   when the job is running the following directory structure is created in 
> zookeeper
> 
>   ///leader/resource_manager_lock
>   ///leader/rest_server_lock
>   ///leader/dispatcher_lock
>   
> ///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
>   ///leaderlatch/resource_manager_lock
>   ///leaderlatch/rest_server_lock
>   ///leaderlatch/dispatcher_lock
>   
> ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
>   
> ///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041
>   
> ///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde
>   
> ///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde
> 
> 
>   when the job is cancelled the some ephemeral nodes disappear, but most 
> of them are still there:
> 
>   
> ///leader/5c21f00b9162becf5ce25a1cf0e67cde
>   ///leaderlatch/resource_manager_lock
>   ///leaderlatch/rest_server_lock
>   ///leaderlatch/dispatcher_lock
>   
> ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
>   ///checkpoints/
>   ///checkpoint-counter/
>   ///running_job_registry/
> 
>> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0? 
> 
> I start the job with Flink-1.6.1
> 
> 
>> Was there a failover of Job Master while running before the cancelation?
> 
> no there was no failover, as the job is deployed as a YARN cluster,  (YARN 
> Cluster High Availability guide states that no failover is required)
> 
>> What version of Zookeeper do you use?
> 
> Zookeer-3.4.10
> 
>> In general, it should not be the case and all job related data should be 
>> cleaned from Zookeeper upon cancellation.
> 
> as far as I understood the issue concerns a JobManager failover process and 
> my question is about a manual intended cancellation of a job.
> 
> Here is the method [1] responsible for cleaning zookeeper folders up [1] 
> which is called when the job manager has stopped [2]. 
> And it seems it only cleans up the folder running_job_registry, other folders 
> stay untouched. I supposed that everything under the 
> /// folder is cleaned up when the job is 
> cancelled.
> 
> 
> [1] 
> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107
>  
> 
> [2] 
> https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332
>  
> 
>  
> 
> Kind Regards,
> Mike Pryakhin
> 
>> On 26 Oct 2018, at 12:39, Andrey Zagrebin > > wrote:
>> 
>> Hi Mike,
>> 
>> What was the full job life cycle? 
>> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0? 
>> Was there a failover of Job Master while running before the cancelation?
>> What version of Zookeeper do you use?
>> 
>> Flink creates child nodes to create a lock for the job in Zookeeper.
>> Lock is removed by removing child node (ephemeral).
>> Persistent node can be a problem because if job dies and does not remove it, 
>> persistent node will not timeout and disappear as ephemeral one 
>> and the next job instance will not delete it because it is supposed to be 
>> locked by the previous.
>> 
>> There was a recent fix in 1.6.1 where the job data was not properly deleted 
>> from Zookeeper [1].
>> In general, it should not be the case an

flink-1.6.1 :: job deployment :: detached mode

2018-10-26 Thread Mikhail Pryakhin
Hi community!

Righ after I've upgraded flink up to flink-1.6.1 I get an exception during job 
deployment as a YARN cluster. 
The job is submitted with zookeper HA enabled, in detached mode.

The flink yaml contains the following properties:

high-availability: zookeeper
high-availability.zookeeper.quorum: 
high-availability.zookeeper.storageDir: hdfs:///
high-availability.zookeeper.path.root: 
high-availability.zookeeper.path.namespace: 

the job is deployed via flink CLI command like the following:

"${FLINK_HOME}/bin/flink" run \
-m yarn-cluster \
-ynm "${JOB_NAME}-${JOB_VERSION}" \
-yn "${tm_containers}" \
-ys "${tm_slots}" \
-ytm "${tm_memory}" \
-yjm "${jm_memory}" \
-p "${parallelism}" \
-yqu "${queue}" \
-yt "${YARN_APP_PATH}" \
-c "${MAIN_CLASS}" \
-yst \
-yd \
${class_path} \
"${YARN_APP_PATH}"/"${APP_JAR}"


After the job has been successfully deplyed, I've got an exception:

2018-10-26 18:29:17,781 | ERROR | Curator-Framework-0 | 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
 | Background exception was not retry-able or retry gave up
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1406)
at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1097)
at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1130)
at 
org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths.mkdirs(ZKPaths.java:274)
at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CreateBuilderImpl$7.performBackgroundOperation(CreateBuilderImpl.java:561)
at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.OperationAndData.callPerformBackgroundOperation(OperationAndData.java:72)
at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:831)
at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

If the job is deployed in "attached mode" everything goes fine.





Kind Regards,
Mike Pryakhin



smime.p7s
Description: S/MIME cryptographic signature


Re: Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

2018-10-26 Thread Till Rohrmann
Hi Mike,

thanks for reporting this issue. I think you're right that Flink leaves
some empty nodes in ZooKeeper. It seems that we don't delete the
 node with all its children in
ZooKeeperHaServices#closeAndCleanupAllData.

Could you please open a JIRA issue to in order to fix it? Thanks a lot!

Cheers,
Till

On Fri, Oct 26, 2018 at 4:31 PM Mikhail Pryakhin 
wrote:

> Hi Andrey, Thanks a lot for your reply!
>
> What was the full job life cycle?
>
>
> 1. The job is deployed as a YARN cluster with the following properties set
>
> high-availability: zookeeper
> high-availability.zookeeper.quorum: 
> high-availability.zookeeper.storageDir: hdfs:///
> high-availability.zookeeper.path.root: 
> high-availability.zookeeper.path.namespace: 
>
> 2. The job is cancelled via flink cancel  command.
>
>What I've noticed:
> when the job is running the following directory structure is created in
> zookeeper
>
> ///leader/resource_manager_lock
> ///leader/rest_server_lock
> ///leader/dispatcher_lock
>
> ///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
> ///leaderlatch/resource_manager_lock
> ///leaderlatch/rest_server_lock
> ///leaderlatch/dispatcher_lock
>
> ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
>
> ///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041
>
> ///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde
>
> ///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde
>
>
> when the job is cancelled the some ephemeral nodes disappear, but most of
> them are still there:
>
> ///leader/5c21f00b9162becf5ce25a1cf0e67cde
> ///leaderlatch/resource_manager_lock
> ///leaderlatch/rest_server_lock
> ///leaderlatch/dispatcher_lock
>
> ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
> ///checkpoints/
> ///checkpoint-counter/
> ///running_job_registry/
>
> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0?
>
>
> I start the job with Flink-1.6.1
>
>
> Was there a failover of Job Master while running before the cancelation?
>
> no there was no failover, as the job is deployed as a YARN cluster,  (YARN
> Cluster High Availability guide states that no failover is required)
>
> What version of Zookeeper do you use?
>
> Zookeer-3.4.10
>
> In general, it should not be the case and all job related data should be
> cleaned from Zookeeper upon cancellation.
>
> as far as I understood the issue concerns a JobManager failover process
> and my question is about a manual intended cancellation of a job.
>
> Here is the method [1] responsible for cleaning zookeeper folders up [1]
> which is called when the job manager has stopped [2].
> And it seems it only cleans up the folder *running_job_registry,* other
> folders stay untouched. I supposed that everything under the 
> *///
> *folder is cleaned up when the job is cancelled.
>
>
> [1]
> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107
> [2]
> https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332
>
>
> Kind Regards,
> Mike Pryakhin
>
> On 26 Oct 2018, at 12:39, Andrey Zagrebin 
> wrote:
>
> Hi Mike,
>
> What was the full job life cycle?
> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0?
> Was there a failover of Job Master while running before the cancelation?
> What version of Zookeeper do you use?
>
> Flink creates child nodes to create a lock for the job in Zookeeper.
> Lock is removed by removing child node (ephemeral).
> Persistent node can be a problem because if job dies and does not remove
> it,
> persistent node will not timeout and disappear as ephemeral one
> and the next job instance will not delete it because it is supposed to be
> locked by the previous.
>
> There was a recent fix in 1.6.1 where the job data was not properly
> deleted from Zookeeper [1].
> In general, it should not be the case and all job related data should be
> cleaned from Zookeeper upon cancellation.
>
> Best,
> Andrey
>
> [1] https://issues.apache.org/jira/browse/FLINK-10011
>
> On 25 Oct 2018, at 15:30, Mikhail Pryakhin  wrote:
>
> Hi Flink experts!
>
> When a streaming job with Zookeeper-HA enabled gets cancelled all the
> job-related Zookeeper nodes are not removed. Is there a reason behind that?
> I noticed that Zookeeper paths are created of type "Container Node" (an
> Ephemeral node that can have nested nodes) and fall back to Persistent node
> type in case Zookeeper doesn't support this sort of nodes.
> But anyway, it is worth removing the job Zookeeper node when a job is
> cancelled, isn't it?
>
> Thank you in advance!
>
> Kind Regards,
> Mike Pryakhin
>
>
>
>


Re: Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

2018-10-26 Thread Mikhail Pryakhin
Hi Andrey, Thanks a lot for your reply!

> What was the full job life cycle? 

1. The job is deployed as a YARN cluster with the following properties set

high-availability: zookeeper
high-availability.zookeeper.quorum: 
high-availability.zookeeper.storageDir: hdfs:///
high-availability.zookeeper.path.root: 
high-availability.zookeeper.path.namespace: 

2. The job is cancelled via flink cancel  command.

   What I've noticed:
when the job is running the following directory structure is created in 
zookeeper

///leader/resource_manager_lock
///leader/rest_server_lock
///leader/dispatcher_lock

///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
///leaderlatch/resource_manager_lock
///leaderlatch/rest_server_lock
///leaderlatch/dispatcher_lock

///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock

///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041

///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde

///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde


when the job is cancelled the some ephemeral nodes disappear, but most 
of them are still there:


///leader/5c21f00b9162becf5ce25a1cf0e67cde
///leaderlatch/resource_manager_lock
///leaderlatch/rest_server_lock
///leaderlatch/dispatcher_lock

///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
///checkpoints/
///checkpoint-counter/
///running_job_registry/

> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0? 

I start the job with Flink-1.6.1


> Was there a failover of Job Master while running before the cancelation?

no there was no failover, as the job is deployed as a YARN cluster,  (YARN 
Cluster High Availability guide states that no failover is required)

> What version of Zookeeper do you use?

Zookeer-3.4.10

> In general, it should not be the case and all job related data should be 
> cleaned from Zookeeper upon cancellation.

as far as I understood the issue concerns a JobManager failover process and my 
question is about a manual intended cancellation of a job.

Here is the method [1] responsible for cleaning zookeeper folders up [1] which 
is called when the job manager has stopped [2]. 
And it seems it only cleans up the folder running_job_registry, other folders 
stay untouched. I supposed that everything under the 
/// folder is cleaned up when the job is 
cancelled.


[1] 
https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107
[2] 
https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332
 

Kind Regards,
Mike Pryakhin

> On 26 Oct 2018, at 12:39, Andrey Zagrebin  wrote:
> 
> Hi Mike,
> 
> What was the full job life cycle? 
> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0? 
> Was there a failover of Job Master while running before the cancelation?
> What version of Zookeeper do you use?
> 
> Flink creates child nodes to create a lock for the job in Zookeeper.
> Lock is removed by removing child node (ephemeral).
> Persistent node can be a problem because if job dies and does not remove it, 
> persistent node will not timeout and disappear as ephemeral one 
> and the next job instance will not delete it because it is supposed to be 
> locked by the previous.
> 
> There was a recent fix in 1.6.1 where the job data was not properly deleted 
> from Zookeeper [1].
> In general, it should not be the case and all job related data should be 
> cleaned from Zookeeper upon cancellation.
> 
> Best,
> Andrey
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10011 
> 
> 
>> On 25 Oct 2018, at 15:30, Mikhail Pryakhin > > wrote:
>> 
>> Hi Flink experts!
>> 
>> When a streaming job with Zookeeper-HA enabled gets cancelled all the 
>> job-related Zookeeper nodes are not removed. Is there a reason behind that? 
>> I noticed that Zookeeper paths are created of type "Container Node" (an 
>> Ephemeral node that can have nested nodes) and fall back to Persistent node 
>> type in case Zookeeper doesn't support this sort of nodes. 
>> But anyway, it is worth removing the job Zookeeper node when a job is 
>> cancelled, isn't it?
>> 
>> Thank you in advance!
>> 
>> Kind Regards,
>> Mike Pryakhin
>> 
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: 答复: Flink1.6.0 submit job and got "No content to map due to end-of-input" Error

2018-10-26 Thread Jeroen Steggink | knowsy

Hi,

I'm running Flink 1.5.4 and all dependencies in the job rely on 1.5.4. 
However, I still get this error. According to the JIRA issue it should 
be fixed in 1.5.4 as well.


Since I'm using Apache Beam to build the jar, I can't move to version 1.6.x.

What could it be?

Cheers,

Jeroen

On 07-Sep-18 17:52, Till Rohrmann wrote:

Hi Gongsen,

Chesnay found and fixed the problem: 
https://issues.apache.org/jira/browse/FLINK-10293.


Cheers,
Till


On Wed, Sep 5, 2018 at 10:00 AM 潘 功森 > wrote:


Hi  Chesney,

   I can sure the client and cluster all upgraded to 1.6.0
cause if I used “./flink run XXX.jar” to submit a job and it works
fine.You can see ui below.

   But when I used createRemoteEnvironment at local, and it
failed.It confused me a lot.


*发件人:* Chesnay Schepler mailto:ches...@apache.org>>
*发送时间:* Wednesday, September 5, 2018 3:23:23 PM
*收件人:* 潘 功森; vino yang; d...@flink.apache.org

*抄送:* user
*主题:* Re: 答复: Flink1.6.0 submit job and got "No content to map due
to end-of-input" Error
Did you upgrade both the client and cluster to 1.6.0? The server
returned a completely empty response which shouldn't be possible
if it runs 1.6.0.

On 05.09.2018 07:27, 潘 功森 wrote:


Hi  Vino,

Below are dependencies I used,please have a look.

I floud it also inclued flink-connector-kafka-0.10_2.11-1.6.0.jar
and flink-connector-kafka-0.9_2.11-1.6.0.jar, and I don’t know if
it has any effect?

yours,

Gongsen

发送自Windows 10 版邮件
应用


*发件人:* vino yang 

*发送时间:* Wednesday, September 5, 2018 10:35:59 AM
*收件人:* d...@flink.apache.org 
*抄送:* user
*主题:* Re: Flink1.6.0 submit job and got "No content to map due to
end-of-input" Error
Hi Pangongsen,

Do you upgrade the Flink-related dependencies you use at the same
time? In other words, is the dependency consistent with the flink
version?

Thanks, vino.

? ?? mailto:pangong...@hotmail.com>>
于2018年9月4日周二 下午10:07写道:

Hi all,
     I use below way to submit jar to Flink :

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(config.clusterIp,
config.clusterPort,
config.clusterFlinkJar);


    I used Flink 1.3.2 before, and it works fine. But I
upgrade it to 1.6.0, and I got the error below:

2018-09-04 21:38:32.039 [ERROR]
[flink-rest-client-netty-19-1]
org.apache.flink.runtime.rest.RestClient - Unexpected
plain-text response:

2018-09-04 21:38:32.137 [ERROR]
[flink-rest-client-netty-18-1]
org.apache.flink.runtime.rest.RestClient - Response was not
valid JSON.


org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
No content to map due to end-of-input


    Could you give me some advice to fix it?

yours,
Gongsen







Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-26 Thread Aaron Levin
Hey,

Not sure how convo threading works on this list, so in case the folks CC'd
missed my other response, here's some more info:

First, I appreciate everyone's help! Thank you!

I wrote several wrappers to try and debug this, including one which is an
exact copy of `InputFormatSourceFunction` which also failed. They all
failed with the same error I detail above. I'll post two of them below.
They all extended `RichParallelSourceFunction` and, as far as I could tell,
were properly initialized (though I may have missed something!).
Additionally, for the two below, if I change `extends
RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
I no longer receive the exception. This is what led me to believe the
source of the issue was casting and how I found the line of code where the
stream graph is given the input format.

Quick explanation of the wrappers:
1. `WrappedInputFormat` does a basic wrap around
`InputFormatSourceFunction` and delegates all methods to the underlying
`InputFormatSourceFunction`
2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
`InputFormatSourceFunction` source.
3. They're being used in a test which looks vaguely like:
`DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
InputFormatSourceFunction[String](source, implicitly[TypeInformation[
String]]))).javaStream).asScala.toSeq`

class WrappedInputFormat[A](
  inputFormat: InputFormatSourceFunction[A]
)(
  implicit typeInfo: TypeInformation[A]
) extends RichParallelSourceFunction[A] {

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
inputFormat.run(sourceContext)
  }
  override def setRuntimeContext(t: RuntimeContext): Unit = {
inputFormat.setRuntimeContext(t)
  }
  override def equals(obj: scala.Any) = {
inputFormat.equals(obj)
  }
  override def hashCode() = { inputFormat.hashCode() }
  override def toString = { inputFormat.toString }
  override def getRuntimeContext(): RuntimeContext = {
inputFormat.getRuntimeContext }
  override def getIterationRuntimeContext = {
inputFormat.getIterationRuntimeContext
}
  override def open(parameters: Configuration): Unit = {
inputFormat.open(parameters)
  }
  override def cancel(): Unit = {
inputFormat.cancel()
  }
  override def close(): Unit = {
inputFormat.close()
  }
}

And the other one:

class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
InputSplit], val typeInfo: TypeInformation[A]) extends
RichParallelSourceFunction[A] {

  @transient private var provider: InputSplitProvider = _
  @transient private var serializer: TypeSerializer[A] = _
  @transient private var splitIterator: Iterator[InputSplit] = _
  private var isRunning: Boolean = _

  override def open(parameters: Configuration): Unit = {
val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
if(format.isInstanceOf[RichInputFormat[_,_]]) {
  format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
}
format.configure(parameters)

provider = context.getInputSplitProvider
serializer = typeInfo.createSerializer(getRuntimeContext.
getExecutionConfig)
splitIterator = getInputSplits()
isRunning = splitIterator.hasNext
  }

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
  format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
}

var nextElement: A = serializer.createInstance()
try {
  while (isRunning) {
format.open(splitIterator.next())
while (isRunning && !format.reachedEnd()) {
  nextElement = format.nextRecord(nextElement)
  if (nextElement != null) {
sourceContext.collect(nextElement)
  } else {
break
  }
  format.close()
  if (isRunning) {
isRunning = splitIterator.hasNext
  }
}
  }
} finally {

  format.close()
  if (format.isInstanceOf[RichInputFormat[_,_]]) {
format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
  }
  isRunning = false
}
  }

  override def cancel(): Unit = {
isRunning = false
  }

  override def close(): Unit = {
format.close()
if(format.isInstanceOf[RichInputFormat[_,_]]) {
  format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
}
  }

  private def getInputSplits(): Iterator[InputSplit] = {
new Iterator[InputSplit] {
  private var nextSplit: InputSplit = _
  private var exhausted: Boolean = _

  override def hasNext: Boolean = {
if(exhausted) { return false }
if(nextSplit != null) { return true }
var split: InputSplit = null

try {
  split = provider.getNextInputSplit(getRuntimeContext.
getUserCodeClassLoader)
} catch {
  case e: InputSplitProviderException =>
throw new RuntimeException("No InputSplit Provider", e)
}

if(split != null) {
 

Re: Flink Task Allocation on Nodes

2018-10-26 Thread Kien Truong

Hi,

There are couple of reasons:

- Easier resource allocation and isolation: one faulty job doesn't 
affect another.


- Mix and match of Flink version: you can leave the old stable jobs run 
with the old Flink version, and use the latest version of Flink for new 
jobs.


- Faster metrics collection: Flink generates a lots of metrics, by 
keeping each cluster small, our Prometheus instance can scrape their 
metrics a lot faster.



Regards,

Kien


On 10/26/2018 2:50 PM, Sayat Satybaldiyev wrote:
Thanks for the advice, Klein. Could you please share more details why 
it's best to allocate for each job a separate cluster?


On Wed, Oct 24, 2018 at 3:23 PM Kien Truong > wrote:


Hi,

You can have multiple Flink clusters on the same set of physical
machines. In our experience, it's best to deploy a separate Flink
cluster for each job and adjust the resource accordingly.

Best regards,
Kien


On Oct 24, 2018 at 20:17, mailto:saya...@gmail.com>> wrote:

Flink Cluster in standalone with HA configuration. It has 6 Task
managers and each has 8 slots. Overall, 48 slots for the cluster.

>>If you cluster only have one task manager with one slot in each
node, then the job should be spread evenly.
Agree, this will solve the issue. However, the cluster is running
other jobs and in this case it won't have hardware resource for
other jobs.

On Wed, Oct 24, 2018 at 2:20 PM Kien Truong
mailto:duckientru...@gmail.com>> wrote:

Hi,

How are your task managers deploy ?

If you cluster only have one task manager with one slot in
each node,
then the job should be spread evenly.

Regards,

Kien

On 10/24/2018 4:35 PM, Sayat Satybaldiyev wrote:
> Is there any way to indicate flink not to allocate all
parallel tasks
> on one node?  We have a stateless flink job that reading
from 10
> partition topic and have a parallelism of 6. Flink job manager
> allocates all 6 parallel operators to one machine, causing
all traffic
> from Kafka allocated to only one machine. We have a cluster
of 6 nodes
> and ideal to spread one parallel operator to one machine.
Is there a
> way to do than in Flink?



HA jobmanagers redirect to ip address of leader instead of hostname

2018-10-26 Thread Jeroen Steggink | knowsy

Hi,

I'm having some troubles with Flink jobmanagers in a HA setup within 
OpenShift.


I have three jobmanagers, a Zookeeper cluster and a loadbalancer 
(Openshift/Kubernetes Route) for the web ui / rest server on the 
jobmanagers. Everything works fine, as long as the loadbalancer connects 
to the leader. However, when the leader changes and the loadbalancer 
connects to a non-leader, the jobmanager redirects to a leader using the 
ip address of the host. Since the routing in our network is done using 
hostnames, it doesn't know how to find the node using the ip address and 
results in a timeout.


So I have a few questions:
1. Why is Flink using the ip addresses instead of the hostname which are 
configured in the config? Other times it does use the hostname, like the 
info send to Zookeeper.
2. Is there another way of coping with connections to non-leaders 
instead of redirects? Maybe proxying through a non-leader to the leader?


Cheers,
Jeroen



Re: Accumulating a batch

2018-10-26 Thread Austin Cawley-Edwards
Hi Hequn,

Thank you for the info! Much appreciated! Do you know if a GroupBy Window
could be used to buffer until an arbitrary record came through?

Best,
Austin

On Thu, Oct 25, 2018, 9:17 PM Hequn Cheng  wrote:

> Hi Austin,
>
> You can use GroupBy Window[1], such as TUMBLE Window. The size of the
> window either as time or row-count interval. You can also define your
> own User-Defined Aggregate Functions[2] to be used in window.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#tumble-tumbling-windows
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#aggregation-functions
>
> On Fri, Oct 26, 2018 at 5:08 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi there,
>>
>> Is it possible to use an AggregationFunction to accumulate n values in a
>> buffer until a threshold is met, then stream down all records in the batch?
>>
>> Thank you!
>> Austin Cawley-Edwards
>>
>


RichInputFormat working differently in eclipse and in flink cluster

2018-10-26 Thread Teena Kappen // BPRISE
Hi all,

I have implemented RichInputFormat for reading result of aggregation queries in 
Elasticsearch. There are around 10 buckets, which are of type json array. 
Note: This is one time response.

My idea here is to iterate these arrays in parallel. Here is the pseudo code.

public void configure(Configuration parameters) {
System.out.println("configure");
}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
}

public ResponseInputSplit[] createInputSplits(int minNumSplits){
System.out.println("createInputSplits");

//read from elastic
// add buckets to array
}

public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[] 
inputSplits) {
//this is default
System.out.println("getInputSplitAssigner");
return new DefaultInputSplitAssigner(inputSplits);
}

public void open(ResponseInputSplit split) {
//read buckets
}

public boolean reachedEnd(){
System.out.println("reachedEnd");
}

public Bounce nextRecord(Bounce reuse) {
}

public void close(){
}

// my main method,
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet bounce_data_set = env.createInput(new 
MyInputDataSetInputFormat());

When running in eclipse, it executes createInputSplits and the results look 
fine. Logs are given below.
Output is -->
configure
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1685591882] 
with leader session id...
configure
createInputSplits

When submitting job in flink cluster, it doesn't execute 'configure' and 
'createInputSplits' methods. Instead it directly goes to nextRecord function. 
Logs are given below.
Output is -->
Starting execution of program
configure
Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for job 
completion.
Connected to JobManager at Actor[akka.tcp://flink@:xxx 
/user/jobmanager#1219973491] with leader session id...
10/26/2018 15:05:57 Job execution switched to status RUNNING.
10/26/2018 15:05:57 DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED
10/26/2018 15:05:57 DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING
10/26/2018 15:06:00 DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING
10/26/2018 15:06:00 DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED
java.lang.NullPointerException
   at com.xxx.test. 
MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)

Regards,
Teena



Re: RocksDB checkpointing dir per TM

2018-10-26 Thread Taher Koitawala
Thanks!

On Fri 26 Oct, 2018, 3:31 PM Andrey Zagrebin, 
wrote:

> Hi Taher,
>
> TMs keep state locally while running, in this case RocksDB files already
> belong to TM.
> You can point it to the same NVME disk location on each node, relevant
> Flink options here are:
> - io.tmp.dirs
> - taskmanager.state.local.root-dirs
> This data is transient and has temporary nature. It does not survive a job
> failure.
>
> The checkpoint is a logical snapshot of the operator state for all
> involved TMs,
> so it belongs to the job and usually uploaded to a distributed file system
> available on all TMs.
> The location is set in Flink option ‘state.checkpoints.dir'.
> This way job can restore from it with different set of TMs.
>
> Best,
> Andrey
>
> > On 26 Oct 2018, at 08:29, Taher Koitawala 
> wrote:
> >
> > Hi All,
> >   Our current cluster configuration uses one HDD which is mainly
> for root and an other NVME disk per node, [1]we want make sure all TMs
> write their own RocksDB files to the NVME disk only, how do we do that?
> >
> > [2] Is it also possible to specify multiple directories per TMs so that
> we have an even spread when the RocksDB files are written?
> >
> > Thanks,
> > Taher Koitawala
>
>


Re: Flink Task Allocation on Nodes

2018-10-26 Thread Marvin777
Hi all,

In the mode of on yarn, a node may contain more than one container,  is
there a scheme for assigning tasks to different nodes.

the version is 1.4.2

Thanks for your assistance.

Sayat Satybaldiyev  于2018年10月26日周五 下午3:50写道:

> Thanks for the advice, Klein. Could you please share more details why it's
> best to allocate for each job a separate cluster?
>
> On Wed, Oct 24, 2018 at 3:23 PM Kien Truong 
> wrote:
>
>> Hi,
>>
>> You can have multiple Flink clusters on the same set of physical
>> machines. In our experience, it's best to deploy a separate Flink
>> cluster for each job and adjust the resource accordingly.
>>
>> Best regards,
>> Kien
>>
>> On Oct 24, 2018 at 20:17, >
>> wrote:
>>
>> Flink Cluster in standalone with HA configuration. It has 6 Task managers
>> and each has 8 slots. Overall, 48 slots for the cluster.
>>
>> >>If you cluster only have one task manager with one slot in each node,
>> then the job should be spread evenly.
>> Agree, this will solve the issue. However, the cluster is running other
>> jobs and in this case it won't have hardware resource for other jobs.
>>
>> On Wed, Oct 24, 2018 at 2:20 PM Kien Truong 
>> wrote:
>>
>>> Hi,
>>>
>>> How are your task managers deploy ?
>>>
>>> If you cluster only have one task manager with one slot in each node,
>>> then the job should be spread evenly.
>>>
>>> Regards,
>>>
>>> Kien
>>>
>>> On 10/24/2018 4:35 PM, Sayat Satybaldiyev wrote:
>>> > Is there any way to indicate flink not to allocate all parallel tasks
>>> > on one node?  We have a stateless flink job that reading from 10
>>> > partition topic and have a parallelism of 6. Flink job manager
>>> > allocates all 6 parallel operators to one machine, causing all traffic
>>> > from Kafka allocated to only one machine. We have a cluster of 6 nodes
>>> > and ideal to spread one parallel operator to one machine. Is there a
>>> > way to do than in Flink?
>>>
>>


Re: KafkaException or ExecutionStateChange failure on job startup

2018-10-26 Thread Mark Harris
Hi Dominik

Setting that bit of configuration seems to have done the trick for the
MXBean exception.

Many thanks for your help.

Best regards,

Mark

On Tue, 23 Oct 2018 at 14:41, Dominik Wosiński  wrote:

> Hey Mark,
>
> Do You use more than 1 Kafka consumer for Your jobs? I think this relates
> to the known issue in Kafka:
> https://issues.apache.org/jira/browse/KAFKA-3992.
> The problem is that if You don't provide client ID for your
> *KafkaConsumer* Kafka assigns one, but this is done in an unsynchronized
> way, so finally, it ends up in assigning the same id for multiple
> different Consumer instances. Probably this is what happens when multiple
> jobs are resumed at the same time.
>
> What You could try to do is to assign the *consumer.id
> * using properties passed to each consumer. This
> should help in solving this issue.
>
> Best Regards,
> Dom.
>
>
>
>
> wt., 23 paź 2018 o 13:21 Mark Harris 
> napisał(a):
>
>> Hi,
>> We regularly see the following two exceptions in a number of jobs shortly
>> after they have been resumed during our flink cluster startup:
>>
>> org.apache.kafka.common.KafkaException: Error registering mbean
>> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>> at
>> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
>> at
>> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
>> at
>> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:436)
>> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
>> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
>> at
>> org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:749)
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:327)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:188)
>> at
>> org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:283)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1344)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:77)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:473)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: javax.management.InstanceAlreadyExistsException:
>> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>> at
>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>> at
>> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
>> ... 21 more
>> java.lang.Exception: Failed to send ExecutionStateChange notification to
>> JobManager
>> at
>> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage$3$$anonfun$apply$2.apply(TaskManager.scala:439)
>> at
>> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage$3$$anonfun$apply$2.apply(TaskManager.scala:423)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> at
>> akka.

Re: RocksDB checkpointing dir per TM

2018-10-26 Thread Andrey Zagrebin
Hi Taher,

TMs keep state locally while running, in this case RocksDB files already belong 
to TM.
You can point it to the same NVME disk location on each node, relevant Flink 
options here are:
- io.tmp.dirs
- taskmanager.state.local.root-dirs
This data is transient and has temporary nature. It does not survive a job 
failure.

The checkpoint is a logical snapshot of the operator state for all involved 
TMs, 
so it belongs to the job and usually uploaded to a distributed file system 
available on all TMs.
The location is set in Flink option ‘state.checkpoints.dir'.
This way job can restore from it with different set of TMs.

Best,
Andrey

> On 26 Oct 2018, at 08:29, Taher Koitawala  wrote:
> 
> Hi All,
>   Our current cluster configuration uses one HDD which is mainly for 
> root and an other NVME disk per node, [1]we want make sure all TMs write 
> their own RocksDB files to the NVME disk only, how do we do that? 
> 
> [2] Is it also possible to specify multiple directories per TMs so that we 
> have an even spread when the RocksDB files are written?  
> 
> Thanks,
> Taher Koitawala 



Re: Flink yarn -kill

2018-10-26 Thread Andrey Zagrebin
Hi Mike,

'yarn -kill’ is out of Flink control. Currently, there is no hook provided by 
Flink for this.
Maybe yarn has some feature for this kind of cleanup, then savepoint could be 
taken there.

Best,
Andrey

> On 25 Oct 2018, at 15:44, Mikhail Pryakhin  wrote:
> 
> Hi Flink community,
> Could you please help me clarify the following question:
> When a streaming job running in YARN gets manually killed via yarn -kill 
> command is there any way to make a savepoint or other clean up actions before 
> the job manager is killed? 
> 
> Kind Regards,
> Mike Pryakhin
> 



Re: Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

2018-10-26 Thread Andrey Zagrebin
Hi Mike,

What was the full job life cycle? 
Did you start it with Flink 1.6.1 or canceled job running with 1.6.0? 
Was there a failover of Job Master while running before the cancelation?
What version of Zookeeper do you use?

Flink creates child nodes to create a lock for the job in Zookeeper.
Lock is removed by removing child node (ephemeral).
Persistent node can be a problem because if job dies and does not remove it, 
persistent node will not timeout and disappear as ephemeral one 
and the next job instance will not delete it because it is supposed to be 
locked by the previous.

There was a recent fix in 1.6.1 where the job data was not properly deleted 
from Zookeeper [1].
In general, it should not be the case and all job related data should be 
cleaned from Zookeeper upon cancellation.

Best,
Andrey

[1] https://issues.apache.org/jira/browse/FLINK-10011

> On 25 Oct 2018, at 15:30, Mikhail Pryakhin  wrote:
> 
> Hi Flink experts!
> 
> When a streaming job with Zookeeper-HA enabled gets cancelled all the 
> job-related Zookeeper nodes are not removed. Is there a reason behind that? 
> I noticed that Zookeeper paths are created of type "Container Node" (an 
> Ephemeral node that can have nested nodes) and fall back to Persistent node 
> type in case Zookeeper doesn't support this sort of nodes. 
> But anyway, it is worth removing the job Zookeeper node when a job is 
> cancelled, isn't it?
> 
> Thank you in advance!
> 
> Kind Regards,
> Mike Pryakhin
> 



Unbalanced Kafka consumer consumption

2018-10-26 Thread gerardg
Hi,

We are experience issues scaling our Flink application and we have observed
that it may be because Kafka messages consumption is not balanced across
partitions. The attached image (lag per partition) shows how only one
partition consumes messages (the blue one in the back) and it wasn't until
it finished that the other ones started to consume at a good rate (actually
the total throughput multiplied by 4 when these started) . Also, when that
ones started to consume, one partition just stopped an accumulated messages
back again until they finished.

We don't see any resource (CPU, network, disk..) struggling in our cluster
so we are not sure what could be causing this behavior. I can only assume
that somehow Flink or the Kafka consumer is artificially slowing down the
other partitions. Maybe due to how back pressure is handled?


 

Gerard





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Task Allocation on Nodes

2018-10-26 Thread Sayat Satybaldiyev
Thanks for the advice, Klein. Could you please share more details why it's
best to allocate for each job a separate cluster?

On Wed, Oct 24, 2018 at 3:23 PM Kien Truong  wrote:

> Hi,
>
> You can have multiple Flink clusters on the same set of physical
> machines. In our experience, it's best to deploy a separate Flink cluster
> for each job and adjust the resource accordingly.
>
> Best regards,
> Kien
>
> On Oct 24, 2018 at 20:17, > wrote:
>
> Flink Cluster in standalone with HA configuration. It has 6 Task managers
> and each has 8 slots. Overall, 48 slots for the cluster.
>
> >>If you cluster only have one task manager with one slot in each node,
> then the job should be spread evenly.
> Agree, this will solve the issue. However, the cluster is running other
> jobs and in this case it won't have hardware resource for other jobs.
>
> On Wed, Oct 24, 2018 at 2:20 PM Kien Truong 
> wrote:
>
>> Hi,
>>
>> How are your task managers deploy ?
>>
>> If you cluster only have one task manager with one slot in each node,
>> then the job should be spread evenly.
>>
>> Regards,
>>
>> Kien
>>
>> On 10/24/2018 4:35 PM, Sayat Satybaldiyev wrote:
>> > Is there any way to indicate flink not to allocate all parallel tasks
>> > on one node?  We have a stateless flink job that reading from 10
>> > partition topic and have a parallelism of 6. Flink job manager
>> > allocates all 6 parallel operators to one machine, causing all traffic
>> > from Kafka allocated to only one machine. We have a cluster of 6 nodes
>> > and ideal to spread one parallel operator to one machine. Is there a
>> > way to do than in Flink?
>>
>