Flink takes too much memory in record serializer.

2017-11-14 Thread yunfan123
In the class org.apache.flink.runtime.io.network.api.writer.RecordWriter, it has same number of serializers with the numChannels. If I first operator has 500 parallels and the next operator has 1000 parallels. And every message in flink is 2MB. The job takes 500 * 1000 * 2MB as 1TB memory in

Re: What happened if my parallelism more than kafka partitions.

2017-11-08 Thread yunfan123
The code of kafka partition assign is like follows: public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFF) % numParallelSubtasks; // here, the assumption is that

What happened if my parallelism more than kafka partitions.

2017-11-08 Thread yunfan123
It seems the same partition data will be consume multi times? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How can I handle backpressure with event time.

2017-11-05 Thread yunfan123
It seems can't support consume multi topics with different deserialization schema. I use protobuf, different topic mapping to different schema. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How flink monitor source stream task(Time Trigger) is running?

2017-10-02 Thread yunfan123
Thank you. "If SourceFunction.run methods returns without an exception Flink assumes that it has cleanly shutdown and that there were simply no more elements to collect/create by this task. " This sentence solve my confusion. -- Sent from:

Re: How flink monitor source stream task(Time Trigger) is running?

2017-09-29 Thread yunfan123
My source stream means the funciton implement the org.apache.flink.streaming.api.functions.source.SourceFunction. My question is how flink know all working thread is alive? If one working thread that execute the SourceFunction crash, how flink know this happenned? -- Sent from:

How flink monitor source stream task(Time Trigger) is running?

2017-09-28 Thread yunfan123
In my understanding, flink just use task heartbeat to monitor taskManager is running. If source stream (Time Trigger for XXX)thread is crash, it seems flink can't recovery from this state? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Keyed function type erasure problem.

2017-09-14 Thread yunfan123
Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Keyed function type erasure problem.

2017-09-11 Thread yunfan123
Just small change from PojoExample: PojoExample.java public class PojoExample { private SelectorContainer selectorContainer; // KeySelector class like this can't be used in flink !!!

Why ListState of flink don't support update?

2017-08-16 Thread yunfan123
If I want to update the list. I have to do two steps: listState.clear() for (Element e : myList) { listState.add(e); } Why not I update the state by: listState.update(myList) ? -- View this message in context:

Use processing time and time window flink acts like batch mode.

2017-07-12 Thread yunfan123
I using processing time and the data source comes from kafka. My code is like follows: streams.keyBy(XXX) .timeWindow(Time.seconds(30)) .apply(myClassObject) Log in myClassObject is like: 2017-07-12 20:00:00, 2017-07-12 20:00:00, 2017-07-12 20:00:30,

Re: System properties when submitting flink job to YARN Session

2017-07-12 Thread yunfan123
Can I The specific the jars that I depend on when I submit my project? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/System-properties-when-submitting-flink-job-to-YARN-Session-tp14158p14207.html Sent from the Apache Flink User Mailing

Re: New message processing time after recovery.

2017-07-03 Thread yunfan123
Thanks. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/New-message-processing-time-after-recovery-tp14032p14092.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

New message processing time after recovery.

2017-06-27 Thread yunfan123
For example, my job failed in timestamp 1. Recovery from checkpoint takes 600 seconds. So the new elements' processing time into my streams is 601? -- View this message in context:

Re: How to perform multiple stream join functionality

2017-06-27 Thread yunfan123
Flink 1.3? I'm use flink 1.3, how can I do to implement this? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-perform-multiple-stream-join-functionality-tp7184p14031.html Sent from the Apache Flink User Mailing List archive. mailing

Re: How to perform multiple stream join functionality

2017-06-27 Thread yunfan123
In flink release 1.3, can I do this in simple way? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-perform-multiple-stream-join-functionality-tp7184p14011.html Sent from the Apache Flink User Mailing List archive. mailing list archive

How can I get last successful checkpoint id in sink?

2017-06-18 Thread yunfan123
It seems flink not support this? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-can-I-get-last-successful-checkpoint-id-in-sink-tp13815.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: User self resource file.

2017-06-15 Thread yunfan123
So your suggestion is I create an archive of all the file in the resources. Then I get the distributed cache of this file and extracted it to a path. Use this path as my resource path? But in which time I should clear the temp path? -- View this message in context:

Re: User self resource file.

2017-06-15 Thread yunfan123
Yes. My resource file is python or other script reference to each other by relative path. What I want is all my resource file in one job place in one directory. And the resource files in different jobs can't place in one directory. The distributedCache can not guarantee this. -- View this

Re: Storm topology running in flink.

2017-06-15 Thread yunfan123
That returns a String specific the resource path. Any suggestion about this? What I want is copy the resource to specific path in task manger, and pass the specific path to my operator. -- View this message in context:

Re: Storm topology running in flink.

2017-06-15 Thread yunfan123
It seems ok, but flink-storm not support storm codeDir. I'm working on to make the flink-storm support the codeDir. To support the code dir, I have to add a new funtion such as getUserResourceDir(may be return null) in flink RuntimeContext. I know this may be a big change. What do you think of

Re: User self resource file.

2017-06-13 Thread yunfan123
Or how can I get the blob store of my jar file. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/User-self-resource-file-tp13693p13694.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink streaming Python

2017-06-07 Thread yunfan123
Vote for python +1. I find it can't support kafka source from code. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-streaming-Python-tp13573p13578.html Sent from the Apache Flink User Mailing List archive. mailing list archive at

Storm topology running in flink.

2017-06-05 Thread yunfan123
I find it never call the spout fail or ack method from the code. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Storm-topology-running-in-flink-tp13495.html Sent from the Apache Flink User Mailing List archive. mailing list archive at

state.backend.fs.checkpointdir may support multi path and failover.

2017-05-23 Thread yunfan123
For now I can only config like: state.backend.fs.checkpointdir: hdfs://namenodehost:port/XXX But the machine running namenode may be down. This config should support multi path in case of some become unavailable. Normally, user should make sure any of them can be used. -- View this message in

Re: Question about start with checkpoint.

2017-05-21 Thread yunfan123
I using 1.2.0 release, so I read the document in https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/savepoints.html. It means it can recover from the savepoint even if I change the parallelism. How does flink implement it? For example using rocksdb+hdfs as state backend, flink

Question about start with checkpoint.

2017-05-20 Thread yunfan123
How this exactly works? For example, I save my state using rocksdb + hdfs. When I change the parallelism of my job, can start from checkpoint work? -- View this message in context:

Why not add flink-connectors to flink dist?

2017-05-15 Thread yunfan123
So how can I use it? Every jar file I submitted should contains the specific connector class? Can I package it to flink-dist ? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-not-add-flink-connectors-to-flink-dist-tp13134.html Sent from

Re: static/dynamic lookups in flink streaming

2017-05-14 Thread yunfan123
The 1.2.0 is released. Can you give an example for the feature function asynchronous operations? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/static-dynamic-lookups-in-flink-streaming-tp10726p13133.html Sent from the Apache Flink User

Re: Deactive a job like storm

2017-05-12 Thread yunfan123
Any issure or plan on the task? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Deactive-a-job-like-storm-tp13088p13123.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Deactive a job like storm

2017-05-10 Thread yunfan123
But why FlinkKafkaConsumerBase don't implement the StoppableFunction interface? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Deactive-a-job-like-storm-tp13088p13099.html Sent from the Apache Flink User Mailing List archive. mailing list

Re: ConnectedStream keyby issues

2017-05-10 Thread yunfan123
In upstairs example, it seems I should clear the state in onTimer function in order to free resource like follows: public void onTimer(long l, OnTimerContext onTimerContext, Collector> collector) throws Exception { if (state.value() != null) {

Re: ConnectedStream keyby issues

2017-05-10 Thread yunfan123
private static class MergeFunction extends RichProcessFunction, Tuple2> { private ValueState> state; @Override public void open(Configuration parameters) throws Exception { state =

Deactive a job like storm

2017-05-10 Thread yunfan123
How can I deactive a job like storm? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Deactive-a-job-like-storm-tp13088.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Stopping the job with ID XXX failed.

2017-05-08 Thread yunfan123
I can't stop the job, every time the exception like follows. Retrieving JobManager. Using address /10.6.192.141:6123 to connect to JobManager. The program finished with the following exception: java.lang.Exception: Stopping the job

Can ValueState use generics?

2017-05-07 Thread yunfan123
My process function is like : private static class MergeFunction extends RichProcessFunction, Tuple2> { private ValueState> state; @Override @SuppressWarnings("unchecked") public void

Re: ConnectedStream keyby issues

2017-05-07 Thread yunfan123
But what happened if some data can't be merged forever ? The state will be saved forever? Can I set a timeout? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13037.html Sent from the Apache Flink User

Why flink 1.2.0 delete flink-connector-redis?

2017-04-26 Thread yunfan123
It exists in 1.1.5. But be deleted in 1.2.0. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-flink-1-2-0-delete-flink-connector-redis-tp12825.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.