Re: Flink on YARN - tmp directory

2017-07-28 Thread Chris Hebert
I should also note that the above steps did get the Flink JobManager and
TaskManagers to save their tmp web dashboard files to /my/tmp/ and to show
in the Dashboard that the taskmanager.tmp.dirs property had been properly
set to /my/tmp/, but the tmp files I wrote in my jobs stubbornly wrote to
/tmp/ anyway.

On Fri, Jul 28, 2017 at 4:55 PM, Chris Hebert <
chris.hebert-...@digitalreasoning.com> wrote:

> Hi,
>
> My jobs create tmp files like so:
>
> java.nio.file.Path tmpFilePath = java.nio.file.Files.createTempFile("tmpFile",
> "txt");
>
> They currently appear in /tmp/, but I want them somewhere else, say
> /my/tmp/.
>
> The Flink on YARN docs say:
>
> Flink on YARN will overwrite the following configuration parameters
> jobmanager.rpc.address (because the JobManager is always allocated at
> different machines), taskmanager.tmp.dirs (we are using the tmp
> directories given by YARN) and parallelism.default if the number of slots
> has been specified.
>
> How would I specify a different tmp directory for a job without modifying
> my YARN tmp directories?
>
> I tried the taskmanager.tmp.dirs property in conf/flink-conf.yaml anyway,
> that failed.
>
> I appended -Djava.io.tmpdir=/my/tmp/ to JVM_ARGS and all three variations
> of DEFAULT_ENV_JAVA_OPTS in bin/config.sh, that failed.
>
> I passed -Djava.io.tmpdir=/my/tmp/ and variations as arguments to .
> /bin/yarn-session.sh and ./bin/flink run et cetera, that failed.
>
> Odd observation:
> The hadoop.tmp.dir property is set in my core-site.xml to /some/other/tmp/,
> yet Flink writes to /tmp/. My yarn-site.xml specifies no tmp.
>
> Side note:
> My Flink job is a Beam pipeline. I doubt that's relevant, but let me know
> if it is.
>
> Thanks,
> Chris
>


Flink on YARN - tmp directory

2017-07-28 Thread Chris Hebert
Hi,

My jobs create tmp files like so:

java.nio.file.Path tmpFilePath =
java.nio.file.Files.createTempFile("tmpFile", "txt");

They currently appear in /tmp/, but I want them somewhere else, say /my/tmp/
.

The Flink on YARN docs say:

Flink on YARN will overwrite the following configuration parameters
jobmanager.rpc.address (because the JobManager is always allocated at
different machines), taskmanager.tmp.dirs (we are using the tmp directories
given by YARN) and parallelism.default if the number of slots has been
specified.

How would I specify a different tmp directory for a job without modifying
my YARN tmp directories?

I tried the taskmanager.tmp.dirs property in conf/flink-conf.yaml anyway,
that failed.

I appended -Djava.io.tmpdir=/my/tmp/ to JVM_ARGS and all three variations
of DEFAULT_ENV_JAVA_OPTS in bin/config.sh, that failed.

I passed -Djava.io.tmpdir=/my/tmp/ and variations as arguments to .
/bin/yarn-session.sh and ./bin/flink run et cetera, that failed.

Odd observation:
The hadoop.tmp.dir property is set in my core-site.xml to /some/other/tmp/,
yet Flink writes to /tmp/. My yarn-site.xml specifies no tmp.

Side note:
My Flink job is a Beam pipeline. I doubt that's relevant, but let me know
if it is.

Thanks,
Chris


Re: Fink: KafkaProducer Data Loss

2017-07-28 Thread ninad
Hi Gordon, I was able to reproduce the data loss on standalone flink cluster
also. I have stripped down version of our code with here:

Environment:
Flink standalone 1.3.0
Kafka 0.9

*What the code is doing:*
-consume messages from kafka topic ('event.filter.topic' property in
application.properties)
-group them by key
-analyze the events in a window and filter some messages.
-send remaining messages to kafka topc ('sep.http.topic' property in
application.properties)

To build: 
./gradlew clean assemble

The jar needs path to 'application.properties' file to run

Important properties in application.properties:
window.session.interval.sec
kafka.brokers
event.filter.topic --> source topic
sep.http.topic --> destination topic

To test:
-Use 'EventGenerator' class to publish messages to source kafka topic
The data published won't be filtered by the logic. If you publish 10
messages to the source topic, 
those 10 messages should be sent to the destination topic.

-Once we  see that flink has received all the messages, bring down all kafka
brokers

-Let Flink jobs fail for 2-3 times. 

-Restart kafka brokers. 

Note: Data loss isn't observed frequently. 1/4 times or so.

Thanks for all your help.

eventFilter.zip

  









--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14522.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Operations dependencies between values with different key in a ConnectedStreams

2017-07-28 Thread Chao Wang

Hi Gabriele,

I think CEP may be able to deal with this kind of expressions, in 
general, although I am not sure about how to deal with different time 
windows (5s and 3s, in your case). Take a look at the available patterns 
in the CEP library doc: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html#the-pattern-api


Chao


On 07/28/2017 10:08 AM, Gabriele Di Bernardo wrote:

Hi guys,

I have a question for you. I have an application with two keyed data streams: 
one for control and the other one for the data. Each control message represents 
an operation to be performed on the data values marked with a certain 
identifier. I connected the two streams and I process the data with a 
CoProcessFunction.

The operations I do are really simple like collecting the MAX or the MEAN value 
of the last n seconds. Now, I would like to create more complex operations 
where the result value of a key might depend by the result value of another 
key. To be more clear, I would like to evaluate expressions like: if {ALL the 
values of data marked with id 22 in the last 5s} are BIGGER THAN {The MEAN 
value of data marked with id 17 in the last 3s}. In this example, I should 
defer the evaluation of the expression until I have the MEAN value of the right 
part of the expression and check it against ALL the data keyed with key 22 from 
the last 5 seconds. I’d like to ask you if something like this in Flink is 
doable and what is the best way to do that in your opinion. I am also checking 
how the CEP library works (can it be helpful?).

Thank you so much in advance.

Best,


Gabriele




Re: CEP condition expression and its event consuming strategy

2017-07-28 Thread Chao Wang

Hi Dawid,

Thank you.

Ad. 1 I noticed that the method getEventsForPattern() returns an 
Iterable and we need to further invoke .operator().next() to get 
access to the event value.


Ad. 2 Here is a bit about a use case we have that calls for such 
discarding semantics. In the event processing project I am currently 
working on, input event streams are sensor data, and we join streams and 
do Kalman filtering, FFT, etc. We therefore choose to discard the 
accepted events once the data they carry have been processed; otherwise, 
it may cause duplicated processing as well as incorrect join semantics.


We came up with this question while doing an empirical comparison of 
Flink and our system (implemented with the TAO real-time event service). 
We implemented in our system such semantics, by removing input events 
once CEP emits the corresponding output events.


Could you provide some use cases where the discarding semantics are not 
needed? I guess I am wired into processing sensor data and thus cannot 
think of a case where reusing accepted events would be of interest. 
Also, could you share some pointers to streaming application in-use? We 
are seeking to make our research work more relevant to current practice.


Thank you very much,

Chao

On 07/27/2017 02:17 AM, Dawid Wysakowicz wrote:

Hi Chao,

Ad. 1 You could implement it with IterativeCondition. Sth like this:

Pattern pattern = Pattern.begin("first").where(new 
SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
   return value.equals("A") || value.equals("B");
}
}).followedBy("second").where(new IterativeCondition() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
   return (value.equals("A") || value.equals("B")) && 
!value.equals(ctx.getEventsForPattern("first"));
}
}).

Ad. 2 Unfortunately right now as you said Pattern restarts each other event and 
it is not possible to change that strategy. There is ongoing work to introduce 
AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. I did not 
give it much thought, but I would try implement some discarding logic.

Regards,
Dawid

[1] https://issues.apache.org/jira/browse/FLINK-7169


On 26 Jul 2017, at 22:45, Chao Wang  wrote:

Hi,

I have two questions regarding the use of the Flink CEP library 
(flink-cep_2.11:1.3.1), as follows:

1. I'd like to know how to use the API to express "emit event C in the presence of events A and B, with 
no restriction on the arriving order of A and B"? I've tried by creating two patterns, one for "A 
and then B" and the other for "B and then A", and consequently using two patternStreams to 
handle each case, which emits C. It worked but to me this approach seems redundant.

2. Given the above objective expression, how to consume the accepted events so 
that they will not be used for future matchings? For example, with the arriving 
sequence {A, B, A}, the CEP should only emit one C (due to the matching of 
{A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with the arriving 
sequence {B, A, B, A}, the CPE should only emit two Cs, not three.


Thanks,

Chao





Access Sliding window

2017-07-28 Thread Raj Kumar
Hi,

I am using a sliding window to monitor server performance. I need to keep
track of number of HTTP requests generated and alert the user when the
requests gets too high(Sliding window of 6 hours which slides every 15mins). 
Aggregate count of the number of http requests is evaluated in the 15mins
sliding window. I need to keep track of running average of these aggregate
count over the different sliding window of 15mins to create an alert when
the load is over the average+1std deviation.

How can we achieve this ? How can we keep track of running average for the
all the sliding windows ?
P



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-window-tp14519.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Operations dependencies between values with different key in a ConnectedStreams

2017-07-28 Thread Gabriele Di Bernardo
Hi guys,

I have a question for you. I have an application with two keyed data streams: 
one for control and the other one for the data. Each control message represents 
an operation to be performed on the data values marked with a certain 
identifier. I connected the two streams and I process the data with a 
CoProcessFunction.

The operations I do are really simple like collecting the MAX or the MEAN value 
of the last n seconds. Now, I would like to create more complex operations 
where the result value of a key might depend by the result value of another 
key. To be more clear, I would like to evaluate expressions like: if {ALL the 
values of data marked with id 22 in the last 5s} are BIGGER THAN {The MEAN 
value of data marked with id 17 in the last 3s}. In this example, I should 
defer the evaluation of the expression until I have the MEAN value of the right 
part of the expression and check it against ALL the data keyed with key 22 from 
the last 5 seconds. I’d like to ask you if something like this in Flink is 
doable and what is the best way to do that in your opinion. I am also checking 
how the CEP library works (can it be helpful?).

Thank you so much in advance.

Best,


Gabriele

Re: [POLL] Dropping savepoint format compatibility for 1.1.x in the Flink 1.4.0 release

2017-07-28 Thread Ufuk Celebi
On Fri, Jul 28, 2017 at 4:03 PM, Stephan Ewen  wrote:
> Seems like no one raised a concern so far about dropping the savepoint
> format compatibility for 1.1 in 1.4.
>
> Leaving this thread open for some more days, but from the sentiment, it
> seems like we should go ahead?

+1


Re: [POLL] Dropping savepoint format compatibility for 1.1.x in the Flink 1.4.0 release

2017-07-28 Thread Stephan Ewen
Seems like no one raised a concern so far about dropping the savepoint
format compatibility for 1.1 in 1.4.

Leaving this thread open for some more days, but from the sentiment, it
seems like we should go ahead?

On Wed, Jul 12, 2017 at 4:43 PM, Stephan Ewen  wrote:

> Hi users!
>
> Flink currently maintains backwards compatibility for savepoint formats,
> which means that savepoints taken with Flink version 1.1.x and 1.2.x can be
> resumed in Flink 1.3.x
>
> We are discussing how many versions back to support. The proposition is
> the following:
>
> *   Suggestion: Flink 1.4.0 will be able to resume savepoints taken with
> version 1.3.x and 1.2.x, but not savepoints from version 1.1.x and 1.0.x*
>
>
> The reason for that is that there is a lot of code mapping between the
> completely different legacy format (1.1.x, not re-scalable) and the
> key-group-oriented format (1.2.x onwards, re-scalable). It would greatly
> help the development of state and checkpointing features to drop that old
> code.
>
> Please let us know if you have concerns about that.
>
> Best,
> Stephan
>
>


Flink QueryableState with Sliding Window on RocksDB

2017-07-28 Thread Biplob Biswas
Hi,

We recently moved from Spark Streaming to Flink for our stream processing
requirements in our organization and we are in the process of removing the
number of external calls as much as possible. Earlier we were using HBASE to
store the incoming data, but we now want to try out stateful operations on
top of Flink.

In that aspect, we have fixed that we need to have a sliding window of size
180 days with a slide Interval of 1 day each such that we keep a state of
180 days at any given time. This state would at max be around 40-50 GB for
the 180 days so we thought of using RocksDB for state storage. 

Now the flow of job we are thinking would be incoming events and some extra
information:

events.keyBy(eventTuple -> eventTuple.getEventUID()).flatMap(new
UpdatedTxnState());

where UpdatedTxnState() is an extension of RichFlatMapFunction class and it
looks something like this :


public class UpdatedTxnState extends RichFlatMapFunction, EventType>, Tuple2> {

  private ValueState, String>>
txnState;

  @Override
  public void open(Configuration config) throws Exception {
// Reducing state that keeps a sum
ValueStateDescriptor, String>>
stateDescriptor = new ValueStateDescriptor<>(
"transaction", TypeInformation.of(new
TypeHint, String>>() {
}));

stateDescriptor.setQueryable("transaction");

this.txnState = getRuntimeContext().getState(stateDescriptor);
  }

  @Override
  public void flatMap(Tuple3, EventType> input,
  Collector> output) throws
Exception {


txnState.update(new Tuple3<>(input._3(),input._2(),input._1());

output.collect(new Tuple2<>(input._1(),input._3()));
  }
}


now, I have a couple of questions :
1. how can I create a sliding window on top of this state? I can think of
doing a keyby on the output of flatmap but for me doesn't really make much
sense and I didn't really find a way to build a state after windowing. 
2. Can I query the state with the state name I defined here "transaction"
anywhere in my job?

Thanks,
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Is watermark used by joining two streams

2017-07-28 Thread xie wei
Hello,

i want to join two streams based on event time window, every stream has its
own watermark, one has priodic watermark and the other has punctuated
watermark.
are the watermarks used to trigger the join? if yes, which one and how is
it used?

Thank you and best regards
Wei


Re: re: How can I submit a flink job to YARN/Cluster from java code?

2017-07-28 Thread Till Rohrmann
The RemoteEnvironment won’t allow you to spawn a Flink cluster running on
Yarn. But you can start one with yarn-session.sh. The same applies to the
REST endpoint. You first have to start a cluster.

Cheers,
Till
​

On Fri, Jul 28, 2017 at 12:14 PM, 程骥  wrote:

> Hi,
> Thanks for yor help.
> This is a good idea.
> Maybe I didn't understand the information of restful API.
> I think the restful API  can not submit a job to yarn.
> Is that right?
>
>
>
> -- 原始邮件 --
> *发件人:* "z...@zjdex.com";;
> *发送时间:* 2017年7月28日(星期五) 下午5:15
> *收件人:* "程骥"; "user";
> *主题:* re: How can I submit a flink job to YARN/Cluster from java code?
>
> Hi:
> I think you can use Flink restful API, like  "POST:
> /jars/MyProgram.jar/run?savepointPath=/my-savepoints/
> savepoint-1bae02a80464&allowNonRestoredState=true"
>The detail information of restful API , you can see the link
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/monitoring/rest_api.html .
>
> --
> z...@zjdex.com
>
>
> *发件人:* 程骥 
> *发送时间:* 2017-07-28 15:50
> *收件人:* user 
> *主题:* How can I submit a flink job to YARN/Cluster from java code?
> Hello,
> I want to submit a flink job to YARN/Cluster from java code.If this is
> feasible?
> Is there anyone tried to do it before ?
>
>
> Thanks
>
>


??????re: How can I submit a flink job to YARN/Cluster from java code?

2017-07-28 Thread ????
Hi,
Thanks for yor help.
This is a good idea.
Maybe I didn't understand the information of restful API.
I think the restful API  can not submit a job to yarn.
Is that right?






--  --
??: "z...@zjdex.com";;
: 2017??7??28??(??) 5:15
??: ""; "user"; 

: re: How can I submit a flink job to YARN/Cluster from java code?



 Hi:
I think you can use Flink restful API, like  "POST: 
/jars/MyProgram.jar/run?savepointPath=/my-savepoints/savepoint-1bae02a80464&allowNonRestoredState=true"
   The detail information of restful API , you can see the link 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html
 .
 


 z...@zjdex.com


  
 
?? 2017-07-28 15:50
 user
?? How can I submit a flink job to YARN/Cluster from java code?


Hello??
I want to submit a flink job to YARN/Cluster from java code.If this is 
feasible? 
Is there anyone tried to do it before ?? 




Thanks

回复: How can I submit a flink job to YARN/Cluster from java code?

2017-07-28 Thread 程骥
Hi,Till
Actually,to submit a flink job to flink clusetr with 
ExecutionEnvironment.createRemoteEnvironment,I have implemented.


but how to submit flink job to flink on yarn,I have no idea.


I think this API not supported.


Thanks for your help.
:)


-- 原始邮件 --
发件人: "Till Rohrmann";;
发送时间: 2017年7月28日(星期五) 下午5:13
收件人: "程骥"; 
抄送: "user"; 
主题: Re: How can I submit a flink job to YARN/Cluster from java code?




Hi,
 
it is possible to submit a job to a running cluster (Standalone/Mesos/Yarn) by 
using the RemoteEnvironment which you can create via 
ExecutionEnvironment.createRemoteEnvironment and then executing your program 
(e.g. running it in the IDE). This assumes that the remote cluster is already 
running.
 
The RemoteEnvironment requires that you give it the hostname and port of the 
running cluster or, if using a HA cluster, a properly configured configuration 
from which the client can retrieve the HA settings.
 
Cheers,
Till
 ​



On Fri, Jul 28, 2017 at 9:50 AM, 程骥  wrote:
Hello,
I want to submit a flink job to YARN/Cluster from java code.If this is 
feasible? 
Is there anyone tried to do it before ? 




Thanks

re: How can I submit a flink job to YARN/Cluster from java code?

2017-07-28 Thread z...@zjdex.com
Hi:
I think you can use Flink restful API, like  "POST: 
/jars/MyProgram.jar/run?savepointPath=/my-savepoints/savepoint-1bae02a80464&allowNonRestoredState=true"
   The detail information of restful API , you can see the link 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html
 .



z...@zjdex.com
 
 
?? 2017-07-28 15:50
 user
?? How can I submit a flink job to YARN/Cluster from java code?
Hello??
I want to submit a flink job to YARN/Cluster from java code.If this is 
feasible? 
Is there anyone tried to do it before ?? 


Thanks


Re: How can I submit a flink job to YARN/Cluster from java code?

2017-07-28 Thread Till Rohrmann
Hi,

it is possible to submit a job to a running cluster (Standalone/Mesos/Yarn)
by using the RemoteEnvironment which you can create via
ExecutionEnvironment.createRemoteEnvironment and then executing your
program (e.g. running it in the IDE). This assumes that the remote cluster
is already running.

The RemoteEnvironment requires that you give it the hostname and port of
the running cluster or, if using a HA cluster, a properly configured
configuration from which the client can retrieve the HA settings.

Cheers,
Till
​

On Fri, Jul 28, 2017 at 9:50 AM, 程骥  wrote:

> Hello,
> I want to submit a flink job to YARN/Cluster from java code.I
> f this is feasible?
> Is there anyone tried to do it before ?
>
>
> Thanks
>


Re: Flink CLI cannot submit job to Flink on Mesos

2017-07-28 Thread Till Rohrmann
Hi Francisco,

have you set the right high-availability configuration options in your
client configuration as described here [1]? If not, then Flink is not able
to find the correct JobManager because it retrieves the address as well as
a fencing token (called leader session id) from the HA store (ZooKeeper).

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#high-availability

Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea <
francisco.gonza...@piksel.com> wrote:

> Hello,
>
> We´re having lot of issues while trying to submit a job remotely using the
> Flink CLI command line tool. We have tried different configurations but in
> all of them we get errors from AKKA while trying to connect. I will try to
> summarise the configurations we´ve tried.
>
> - Flink 1.3.0 deployed within a docker container on a Mesos cluster (using
> Marathon)
> - This flink has the property jobmanager.rpc.address as a hostname (i.e.
> kind of ip-X.eu.west-1.compute.internal)
> - Use the same version for Flink Client remotely (e.g. in my laptop).
>
> When I try to submit the job using the command flink run -m
> myHostName:myPort (the same in jobmanager.rpc.address and
> jobmanager.rpc.port) after some time waiting I get the trace at the end of
> this email. In the flink side we get this error from AKKA:
>
> Association with remote system [akka.tcp://flink@10.203.23.24:24469] has
> failed, address is now gated for [5000] ms. Reason: [Association failed
> with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection
> refused: /10.203.23.24:24469]
>
> After reading a bit, it seems there´re some problems related to akka
> resolving hostnames to ips, so we decided to startup the same flink but
> changing jobmanager.rpc.address to have the direct ip (i.e. kind of
> XX.XXX.XX.XX). In this case I´m getting same trace (at the end of the
> email) from the client side and this one from the Flink server:
>
> Discard message 
> LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
> b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader
> session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the
> received leader session ID ----.
>
> We have tried some other stuff but without success… any clue that could
> help us?
>
> Thanks in advance!
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: JobManager did not respond within 6 milliseconds
> at org.apache.flink.client.program.ClusterClient.
> runDetached(ClusterClient.java:454)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:99)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:400)
> at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(
> DetachedEnvironment.java:76)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:345)
> at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:831)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1073)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1548)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobTimeoutException:
> JobManager did not respond within 6 milliseconds
> at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.
> java:426)
> at org.apache.flink.client.program.ClusterClient.
> runDetached(ClusterClient.java:451)
> ... 15 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [6 milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(
> BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at scala.concurrent.Await.result(package.scala)
> at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.
> java:423)
> ... 16 more
>
>
>
> This message is private and confidential. If you have received this
> message in error, please notify the sender or serviced...@piksel.com and
> remove it from your system.
>
> Piksel Inc 

Re: Memory Leak - Flink / RocksDB ?

2017-07-28 Thread Stefan Richter
Hi,

I see that matching the RocksDB configuration to fit certain container sizes 
can be very tedious and error prone for users. I have opened a jira to start 
improving the situation: https://issues.apache.org/jira/browse/FLINK-7289 
. Please feel free to comment 
and share your experiences or ideas, they might be very valuable input.

One consideration, from what you shared I can see that you are using 8 slots 
per task and a heap size of 35840MB. This means that there are potentially also 
up to 8 RocksDB instances on one TM. Furthermore, when you are using RocksDB, 
your heavy state will typically live in RocksDB (native memory) and no longer 
on the JVM heap. I think it would make a lot of sense to reduce you maximum 
heap size dramatically, so that more memory from your container budget is 
available as native memory for RocksDB. I hope this can also help with your 
problem.

Best,
Stefan

> Am 27.07.2017 um 10:49 schrieb Shashwat Rastogi 
> :
> 
> Hi Kien,
> 
> Sorry it took me sometime to fetch the logs. I am attaching logs of the 
> machine which died due to lack of free memory. 
> 
> 
> 
> I have set only
> `taskmanager.heap.mb: 35840`
> taskmanager.numberOfTaskSlots: 8
> And the rest are just default properties in the flink-conf.yaml
> 
> Thank you in advance.
> 
> Regards
> Shashwat
> 
>> On 26-Jul-2017, at 12:10 PM, Kien Truong > > wrote:
>> 
>> Hi,
>> 
>> What're your task manager memory configuration ? Can you post the 
>> TaskManager's log ?
>> 
>> Regards,
>> 
>> Kien
>> 
>> 
>> On 7/25/2017 8:41 PM, Shashwat Rastogi wrote:
>>> Hi,
>>> 
>>> We have several Flink jobs, all of which reads data from Kafka do some 
>>> aggregations (over sliding windows of (1d, 1h)) and writes data to 
>>> Cassandra. Something like :
>>> 
>>> ```
>>> DataStream lines = env.addSource(new FlinkKafkaConsumer010( … ));
>>> DataStream events = lines.map(line -> parse(line));
>>> DataStream stats = stream
>>> .keyBy(“id”)
>>> .timeWindow(1d, 1h)
>>> .sum(new MyAggregateFunction());
>>> writeToCassandra(stats);
>>> ```
>>> 
>>> We recently made a switch to RocksDbStateBackend, for it’s suitability for 
>>> large states/long windows. However, after making the switch a memory issues 
>>> has come up, the memory utilisation on TaskManager gradually increases from 
>>> 50 GB to ~63GB until the container is killed. We are unable to figure out 
>>> what is causing this behaviour, is there some memory leak on the RocksDB ?
>>> 
>>> How much memory should we allocate to the Flink TaskManager? Since, RocksDB 
>>> is a native application and it does not use the JVM how much of the memory 
>>> should we allocate/leave for RocksDB (out of 64GB of total memory).
>>> Is there a way to set the maximum amount of memory that will be used by 
>>> RocksDB so that it doesn’t overwhelms the system? Are there some 
>>> recommended optimal settings for RocksDB for larger states (for 1 day 
>>> window average state size is 3GB).
>>> 
>>> Any help would be greatly appreciated. I am using Flink v1.2.1.
>>> Thanks in advance.
>>> 
>>> Best,
>>> Shashwat
>> 
> 



How can I submit a flink job to YARN/Cluster from java code?

2017-07-28 Thread ????
Hello??
I want to submit a flink job to YARN/Cluster from java code.If this is 
feasible? 
Is there anyone tried to do it before ?? 




Thanks