does the flink sink only support bio?

2017-12-07 Thread Jinhua Luo
Hi, all. The invoke method of sink seems no way to make async io? e.g. returns Future? For example, the redis connector uses jedis lib to execute redis command synchronously:

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Garrett Barton
Running with these settings: yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120 akka.ask.timeout: 60s containerized.heap-cutoff-ratio: 0.15 taskmanager.memory.fraction: 0.7 taskmanager.memory.off-heap: false taskmanager.memory.preallocate: true env.getConfig().setExecutionMode(ExecutionMode.BATCH)​

Re: specify user name when connecting to hdfs

2017-12-07 Thread Gordon Weakliem
Seems like 3 possibilities: 1. Change the user flink runs as to the user with hdfs rights 2. hdfs chown the directory you're writing to (or hdfs chmod to open up access) 3. I've seen where org.apache.hadoop.security.UserGroupInformation can be used to do something like this:

Re: Data loss in Flink Kafka Pipeline

2017-12-07 Thread Fabian Hueske
Hi Nishu, the data loss might be caused by the fact that processing time triggers do not fire when the program terminates. So, if your program has records stored in a window and program terminates because the input was fully consumed, the window operator won't process the remaining windows but

Re: Data loss in Flink Kafka Pipeline

2017-12-07 Thread Nishu
Hi Fabian, Program is running until I manually stop it. Trigger is also firing as expected because I read the entire data after the trigger firing to see what data is captured. And pass that data over to GroupByKey as Input. Its using Global window so I accumulate entire data each time the

Re: Data loss in Flink Kafka Pipeline

2017-12-07 Thread Nishu
Hi, Thanks for your inputs. I am reading Kafka topics in Global windows and have defined some ProcessingTime triggers. Hence there is no late records. Program is performing join between multiple kafka topics. It consists following types of Transformation sequence is something like : 1. Read

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Greg Hogan
Hi Garrett, In the Web UI, when viewing a job under overview / subtasks, selecting the checkbox "Aggregate task statistics by TaskManager” will reduce the number of displayed rows (though in your case only by half). The following documents profiling a Flink job with Java Flight Recorder:

Re: specify user name when connecting to hdfs

2017-12-07 Thread Edward
I have the same question. I am setting fs.hdfs.hadoopconf to the location of a Hadoop config. However, when I start a job, I get an error message that it's trying to connect to the HDFS directory as user "flink": Caused by:

Re: Data loss in Flink Kafka Pipeline

2017-12-07 Thread Chen Qin
Nishu You might consider sideouput with metrics at least after window. I would suggest having that to catch data screw or partition screw in all flink jobs and amend if needed. Chen On Thu, Dec 7, 2017 at 9:48 AM Fabian Hueske wrote: > Is it possible that the data is

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Fabian Hueske
Ah, no direct memory buffer... Can you try to disable off-heap memory? 2017-12-07 18:35 GMT+01:00 Garrett Barton : > Stacktrace generates every time with the following settings (tried > different memory fractions): > yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120 >

Re: Data loss in Flink Kafka Pipeline

2017-12-07 Thread Fabian Hueske
Is it possible that the data is dropped due to being late, i.e., records with timestamps behind the current watemark? What kind of operations does your program consist of? Best, Fabian 2017-12-07 10:20 GMT+01:00 Sendoh : > I would recommend to also print the count of

Re: Hardware Reference Architecture

2017-12-07 Thread Kostas Kloudas
Hi Hayden, It would be nice if you could share a bit more details about your use case and the load that you expect to have, as this could allow us to have a better view of your needs. As a general set of rules: 1) I would say that the bigger your cluster (in terms of resources, not necessarily

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Garrett Barton
Stacktrace generates every time with the following settings (tried different memory fractions): yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120 akka.ask.timeout: 60s containerized.heap-cutoff-ratio: 0.15 taskmanager.memory.fraction: 0.7/0.3/0.1 taskmanager.memory.off-heap: true

Re: ClassNotFoundException in custom SourceFunction

2017-12-07 Thread Fabian Hueske
Hi, A ClassNotFoundException should not be expected behavior. Can you post the stacktrace of the exception? We had a few issues in the past where Flink didn't use the correct classloader. So this would not be an unusual bug. Thanks, Fabian 2017-12-07 10:44 GMT+01:00 Tugdual Grall

Re: save points through REST API not supported ?

2017-12-07 Thread Fabian Hueske
We are currently voting on the third release candidate for 1.4.0. Feel free to propose this feature on the dev mailing list [1], but I don't think this will result in cancelling the vote. If we identify a blocking issue for 1.4.0, it could be included as well. But we're already a few weeks behind

Re: How to perform efficient DataSet reuse between iterations

2017-12-07 Thread Fabian Hueske
Hi Miguel, I just had another idea that you could try. The problem seems to be that the plan space that the optimizer enumerates exponentially grows as you add more iterations. This happens when there are multiple valid execution strategies for a given operator (mostly applied to Joins). You

Re: Trace jar file name from jobId, is that possible?

2017-12-07 Thread Hao Sun
Let me check details, on top of my mind I remember the job id changes, I might be wrong. On Thu, Dec 7, 2017, 08:48 Fabian Hueske wrote: > AFAIK, a job keeps its ID in case of a recovery. > Did you observe something else? > > 2017-12-07 17:32 GMT+01:00 Hao Sun

Re: Testing CoFlatMap correctness

2017-12-07 Thread Kostas Kloudas
Hi Tovi, What you need is the TwoInputStreamOperatorTestHarness. This will allow you to do something like: TwoInputStreamOperatorTestHarness testHarness = new TwoInputStreamOperatorTestHarness<>(myoperator); testHarness.setup(); testHarness.open();

Re: save points through REST API not supported ?

2017-12-07 Thread Vishal Santoshi
Could that be put into 1.4 release ? This significantly eases tooling around savepoints and though a minor change it is not intrusive or should cause regression.. thank you though... On Dec 7, 2017 3:51 AM, "Chesnay Schepler" wrote: > I've finished the implementation. I

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Fabian Hueske
Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace? The managed memory should be divided among all possible consumers. In case of your simple job, this should just be Sorter. In fact, I'd try to reduce the fraction to give more memory to the JVM heap (OOM means there was not

Testing CoFlatMap correctness

2017-12-07 Thread Sofer, Tovi
Hi group, What is the best practice for testing CoFlatMap operator correctness? We have two source functions, each emits data to stream, and a connect between them, and I want to make sure that when streamA element arrive before stream element, a certain behavior happens. How can I test this

Re: Trace jar file name from jobId, is that possible?

2017-12-07 Thread Fabian Hueske
AFAIK, a job keeps its ID in case of a recovery. Did you observe something else? 2017-12-07 17:32 GMT+01:00 Hao Sun : > I mean restarted during failure recovery > > On Thu, Dec 7, 2017 at 8:29 AM Fabian Hueske wrote: > >> What do you mean by rescheduled? >>

Re: Trace jar file name from jobId, is that possible?

2017-12-07 Thread Hao Sun
I mean restarted during failure recovery On Thu, Dec 7, 2017 at 8:29 AM Fabian Hueske wrote: > What do you mean by rescheduled? > Started from a savepoint or restarted during failure recovery? > > > 2017-12-07 16:59 GMT+01:00 Hao Sun : > >> Anything I can

Re: Trace jar file name from jobId, is that possible?

2017-12-07 Thread Fabian Hueske
What do you mean by rescheduled? Started from a savepoint or restarted during failure recovery? 2017-12-07 16:59 GMT+01:00 Hao Sun : > Anything I can do for the job reschedule case? Thanks. > Or is there a way to add job lifecycle hooks to trace it? > > On Mon, Dec 4, 2017 at

Re: Flink 1.4.0 RC3 and Avro objects with maps have null values

2017-12-07 Thread Timo Walther
Hi Jörn, thanks for the little example. Maybe Avro changed the behavior about maps from the old version we used in 1.3 to the newest version in Flink 1.4.0. I will investigate this and might open an issue for it. Regards, Timo Am 12/7/17 um 5:07 PM schrieb Joern Kottmann: Hello Timo,

Re: Flink 1.4.0 RC3 and Avro objects with maps have null values

2017-12-07 Thread Joern Kottmann
Hello Timo, thanks for your quick response. I can't share the code of that pipeline here. The Flink version I am using is this one: http://people.apache.org/~aljoscha/flink-1.4.0-rc3/flink-1.4.0-src.tgz That was compiled by me with mvn install -DskipTests (for some reason the tests failed, can

Re: Trace jar file name from jobId, is that possible?

2017-12-07 Thread Hao Sun
Anything I can do for the job reschedule case? Thanks. Or is there a way to add job lifecycle hooks to trace it? On Mon, Dec 4, 2017 at 12:01 PM Hao Sun wrote: > Thanks Fabian, there is one case can not be covered by the REST API. When > a job rescheduled to run, but jobid

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Garrett Barton
Thanks for the reply again, I'm currently doing runs with: yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120 akka.ask.timeout: 60s containerized.heap-cutoff-ratio: 0.15 taskmanager.memory.fraction: 0.7 taskmanager.memory.off-heap: true taskmanager.memory.preallocate: true When I change the config

Re: Flink 1.4.0 RC3 and Avro objects with maps have null values

2017-12-07 Thread Timo Walther
Can you also check the type of the keys in your map. Avro distinguished between String and Utf8 class. Maybe this is why your key cannot be found. Regards, Timo Am 12/7/17 um 3:54 PM schrieb Timo Walther: Hi Jörn, could you tell us a bit more about your job? Did you import the flink-avro

Re: Flink 1.4.0 RC3 and Avro objects with maps have null values

2017-12-07 Thread Timo Walther
Hi Jörn, could you tell us a bit more about your job? Did you import the flink-avro module? How does the Flink TypeInformation for your Avro type look like using println(ds.getType)? It sounds like a very weird error if the toString() method shows the key. Can you reproduce the error in your

Re: How to perform efficient DataSet reuse between iterations

2017-12-07 Thread Miguel Coimbra
Hello Fabian, It really looks like an issue requiring attention. Since I am launching the client application via Maven, I opted to change the maximum memory setting with export MAVEN_OPTS="-Xms256m -Xmx4m". To give an example, for three (3) iterations it worked fine with around 4 GB of

Flink 1.4.0 RC3 and Avro objects with maps have null values

2017-12-07 Thread Joern Kottmann
Hello, after having a version mismatch between Avro in Flink 1.3.2 I decided to see how things work with Flink 1.4.0. The pipeline I am building runs now, deployed as standalone on YARN with Flink 1.3.2 and putting it "FIRST" on the classpath (to use Avro 1.8.2 instead of an 1.7.x version). The

Hardware Reference Architecture

2017-12-07 Thread Marchant, Hayden
Hi, I'm looking for guidelines for Reference architecture for Hardware for a small/medium Flink cluster - we'll be installing on in-house bare-metal servers. I'm looking for guidance for: 1. Number and spec of CPUs 2. RAM 3. Disks 4. Network 5. Proximity of servers to each other (Most

Re: save points through REST API not supported ?

2017-12-07 Thread Lasse Nedergaard
I hope it can be put in 1.4.1. I have one concern about the rest api. We running 1.3.1 on dc/os and if we apply parameters as arguments and in our code validate these parameters and throw an exception during startup if something is wrong we see that all uploaded jar disappear all running jobs

Re: Taskmanagers are quarantined

2017-12-07 Thread T Obi
Hello, Thank you for much advice. Sorry for my late response. First, I made a little mistake. I set `env.java.opts.taskmanager` to enable GC log, and it cancelled to automatically set `UseG1GC` feature by accident. This means I watched log of Parallel GC. When I enabled both GC log and `UseG1GC`

Re: save points through REST API not supported ?

2017-12-07 Thread Chesnay Schepler
In retrospect I'm quite frustrated we didn't get around to implementing this for 1.4. The least-effort implementation would have required copying one class, and modifying ~10 lines. Doesn't get any more low-hanging than that. On 07.12.2017 12:51, Chesnay Schepler wrote: I've finished the

Re: save points through REST API not supported ?

2017-12-07 Thread Chesnay Schepler
I've finished the implementation. I created 2 new branches in my repository, for 1.3.2 and 1.4. 1.3.2: https://github.com/zentol/flink/tree/release-1.3.2-custom 1.4: https://github.com/zentol/flink/tree/release-1.4-custom Triggering savepoints works exactly like cancel-with-savepoint, just

Re: Performance of docker-flink

2017-12-07 Thread Jayant Ameta
Thank you Gary. I know that theoretically there shouldn't be any performance issue. I was curious to know if any other users have tried out docker-flink and whether they have faced/reported any performance hit. I would want real time processing for some of the events, and was looking existing

Re: Performance of docker-flink

2017-12-07 Thread Gary Yao
Hi Jayant, Running Flink in a Docker container should not have an impact on the performance in itself. Docker does not employ virtualization. To put it simply, Docker containers are processes on the host operating system that are isolated against each other using kernel features. See [1] for a

Re: Maintain heavy hitters in Flink application

2017-12-07 Thread Kostas Kloudas
Hi Max, You are right that Queryable State is not designed to be used as a means for a job to query its own state. In fact, given that you do not know the jobId of your job from within the job itself, I do not think you can use queryable state in your scenario. What you can do is to have a

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Fabian Hueske
That doesn't look like a bad configuration. I have to correct myself regarding the size of the managed memory. The fraction (70%) is applied on the free memory after the TM initialization. This means that memory for network buffers (and other data structures) are subtracted before the managed

Re: ClassNotFoundException in custom SourceFunction

2017-12-07 Thread Tugdual Grall
ok On Thu, Dec 7, 2017 at 10:35 AM, romain.jln wrote: > Hi all, > > I am experiencing some problems with a custom source that I have > implemented. I am getting some ClassNotFoundException randomly during the > execution of the job meanwhile the fat jar submitted to Flink

ClassNotFoundException in custom SourceFunction

2017-12-07 Thread romain.jln
Hi all, I am experiencing some problems with a custom source that I have implemented. I am getting some ClassNotFoundException randomly during the execution of the job meanwhile the fat jar submitted to Flink contains the given classes. After several hours of investigation, I think I have been

Re: Data loss in Flink Kafka Pipeline

2017-12-07 Thread Sendoh
I would recommend to also print the count of input and output of each operator by using Accumulator. Cheers, Sendoh -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: the location of JsonRowDeserializationSchema.java

2017-12-07 Thread Sendoh
exactly! I initially thought this class is in table API. I was building a custom table source and found I have to add Kafka connector dependency for reading JSON encoded data, although my table source doesn't need it. Cheers, Hung -- Sent from:

Data loss in Flink Kafka Pipeline

2017-12-07 Thread Nishu
Hi, I am running a Streaming pipeline(written in Beam Framework) with Flink. *Operator sequence* is -> Reading the JSON data, Parse JSON String to the Object and Group the object based on common key. I noticed that GroupByKey operator throws away some data in between and hence I don't get all