Re: Disk I/O in Flink

2017-05-18 Thread Robert Schmidtke
threading issues which I do not cover, and maybe for some reason DVS serializes access, which is why my statistics and DVS agree to 100%. I'll get more experiments going and report back. Robert On Sat, Apr 29, 2017 at 4:53 PM, Robert Schmidtke <ro.schmid...@gmail.com> wrote: > Hey Martin

Re: Disk I/O in Flink

2017-04-29 Thread Robert Schmidtke
com> wrote: > Hi Robert, > > Any updates on the below for the community? > > Thanks, > M > > On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke <ro.schmid...@gmail.com> > wrote: > >> Hi Ufuk, thanks for coming back to me on this. >> >> The rec

Re: Disk I/O in Flink

2017-04-18 Thread Robert Schmidtke
step will not exceed some > limit (default 128). Hope this can help you. > > Best, > Kurt > > On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <ro.schmid...@gmail.com> > wrote: > >> Hi, >> >> I'm currently examining the I/O patterns of Flink, and I'd

Disk I/O in Flink

2017-04-07 Thread Robert Schmidtke
Hi, I'm currently examining the I/O patterns of Flink, and I'd like to know when/how Flink goes to disk. Let me give an introduction of what I have done so far. I am running TeraGen (from the Hadoop examples package) + TeraSort ( https://github.com/robert-schmidtke/terasort) on a 16 node cluster

Re: Terminology: Split, Group and Partition

2017-01-14 Thread Robert Schmidtke
nk that this would a valuable feature. > > Thanks, Fabian > > 2017-01-13 16:34 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: > >> Just a side note, I'm guessing there's a bug here: >> https://github.com/apache/flink/blob/master/flink- >&

Re: Terminology: Split, Group and Partition

2017-01-13 Thread Robert Schmidtke
hat no new sinks have been added after the last execution. So currently it is not possible for me to first get the execution plan and then run execute the program. Robert On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke <ro.schmid...@gmail.com> wrote: > Hi Fabian, >

Re: Terminology: Split, Group and Partition

2017-01-13 Thread Robert Schmidtke
have to be careful with the SplitDataProperties. If you get > them wrong, the optimizer makes false assumption and the resulting plan > might not compute what you are looking for. > I'd recommend to read the JavaDocs and play a bit with this feature to see > how it behaves. ExecutionEn

Terminology: Split, Group and Partition

2017-01-13 Thread Robert Schmidtke
(say /tmp/input). There can be arbitrarily many input files in each worker's folder. I have written a custom input format that round-robin assigns the files to each of the 16 local input splits ( https://github.com/robert-schmidtke/hdfs-statistics-adapter/blob/master/sfs-analysis/src/main/java/de/zib

Re: Reading worker-local input files

2017-01-04 Thread Robert Schmidtke
Hi Fabian, thanks for your directions! They worked flawlessly. I am aware of the reduced robustness, but then again my input is only available on each worker and not replicated. In case anyone is wondering, here is how I did it: *https://github.com/robert-schmidtke/hdfs-statistics-adapter/tree

Reading worker-local input files

2016-12-27 Thread Robert Schmidtke
Hi everyone, I'm using Flink and/or Hadoop on my cluster, and I'm having them generate log data in each worker node's /local folder (regular mount point). Now I would like to process these files using Flink, but I'm not quite sure how I could tell Flink to use each worker node's /local folder as

Re: How to choose the 'parallelism.default' value

2016-05-05 Thread Robert Schmidtke
The TM's request the buffers in batches, so you 384 were requested, but only 200 were left in the pool. This means your overall pool size is too small. Here is the relevant section from the documentation:

Re: Map from Tuple to Case Class

2016-05-04 Thread Robert Schmidtke
and case classes (but > please take this with a grain of salt). > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html#accept-partial-functions > [2]: I didn't test them, so caution is advisable ;) > > On Wed, May 4, 2016 at 2:00 PM, Ro

Map from Tuple to Case Class

2016-05-04 Thread Robert Schmidtke
Hi everyone, first up, I'm new to Scala, so please bear with me, but I could not find any solution on the web or the Flink documentation. I'm having trouble converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case class. I got it to work, however in a way that I feel is too

Re: Measuring latency in a DataStream

2016-05-03 Thread Robert Schmidtke
After fixing the clock issue on the application level, the latency is as expected. Thanks again! Robert On Tue, May 3, 2016 at 9:54 AM, Robert Schmidtke <ro.schmid...@gmail.com> wrote: > Hi Igor, thanks for your reply. > > As for your first point I'm not sure I understand

Re: Measuring latency in a DataStream

2016-05-03 Thread Robert Schmidtke
not to wrap you data in tuple2 with > additional info of creation ts? > > 2. are you sure that consumer/producer machines' clocks are in sync? > you can use ntp for this. > > On 2 May 2016 at 20:02, Robert Schmidtke <ro.schmid...@gmail.com> wrote: > >> Hi everyo

Measuring latency in a DataStream

2016-05-02 Thread Robert Schmidtke
/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68 On the receiving end I again take the currentTimeMillis in my fold function, expecting the resulting value to be larger (most of the time

Configuring a RichFunction on a DataStream

2016-04-28 Thread Robert Schmidtke
Hi everyone, I noticed that in the DataSet API, there is the .withParameters function that allows passing values to a RichFunction's open method. I was wondering whether a similar approach can be used to the same thing in a DataStream. Right now I'm getting the parameters via getRuntimeContext,

Re: Gracefully stop long running streaming job

2016-04-18 Thread Robert Schmidtke
ng right now. > > -Matthias > > > On 04/18/2016 10:50 PM, Robert Schmidtke wrote: > > Hi everyone, > > > > I am running a streaming benchmark which involves a potentially > > infinitely running Flink Streaming Job. I run it blocking on YARN using > > ./bin/

Gracefully stop long running streaming job

2016-04-18 Thread Robert Schmidtke
Hi everyone, I am running a streaming benchmark which involves a potentially infinitely running Flink Streaming Job. I run it blocking on YARN using ./bin/flink run ... and then send the command into background, remembering its PID to kill it later on. While this gets the work done, the job

Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Schmidtke
Turns out when I remove the explicit dependency on kafka_2.10 v. 0.8.1, then the dependencies are properly included. Guess there was a conflict somehow? I'll need to figure out if the rest of the code is fine with kafka_2.10 v. 0.8.2.0 as well. On Mon, Apr 18, 2016 at 4:32 PM, Robert Schmidtke

Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Schmidtke
Hi Robert, thanks for your offer. After playing around a bit I would like to take it, if you have the time: https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/pom.xml I would guess the POM is similar to the one in the sample project, yet when building

Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Schmidtke
etz...@apache.org> wrote: > Hi, > the problem with the posted project is that it doesn't have the Flink > kafka connector as a dependency. > > On Mon, Apr 18, 2016 at 3:56 PM, Robert Schmidtke <ro.schmid...@gmail.com> > wrote: > >> Hi Robert, >> >&

Re: Flink 0.10.2 and Kafka 0.8.1

2016-04-18 Thread Robert Schmidtke
Hi Robert, thanks for your hints. I was not sure whether I was building a proper fat jar, as I have not used the Flink Archetype for my project. However, I have set up a sample project at https://github.com/robert-schmidtke/flink-test/ which is nothing more than the Quickstart Archetype plus

Flink 0.10.2 and Kafka 0.8.1

2016-04-17 Thread Robert Schmidtke
Hi everyone, I have a Kafka cluster running on version 0.8.1, hence I'm using the FlinkKafkaConsumer081. When running my program, I saw a NoClassDefFoundError for org.apache.kafka.common.Node. So I packaged my binaries according to

Re: Flink performance pre-packaged vs. self-compiled

2016-04-14 Thread Robert Schmidtke
configuration, dataset size. > > Best, > Ovidiu > > On 14 Apr 2016, at 17:14, Robert Schmidtke <ro.schmid...@gmail.com> wrote: > > I have tried multiple Maven and Scala Versions, but to no avail. I can't > seem to achieve performance of the downloaded archive. I am stumped b

Re: Flink performance pre-packaged vs. self-compiled

2016-04-14 Thread Robert Schmidtke
I have tried multiple Maven and Scala Versions, but to no avail. I can't seem to achieve performance of the downloaded archive. I am stumped by this and will need to do more experiments when I have more time. Robert On Thu, Apr 14, 2016 at 1:13 PM, Robert Schmidtke <ro.schmid...@gmail.com>

Re: Flink performance pre-packaged vs. self-compiled

2016-04-14 Thread Robert Schmidtke
h file in the source tree. There > you can see how we are building the release binaries. > It would be quite interesting to find out what caused the performance > difference. > > On Wed, Apr 13, 2016 at 5:03 PM, Robert Schmidtke <ro.schmid...@gmail.com> > wrote: > >&g

Flink performance pre-packaged vs. self-compiled

2016-04-13 Thread Robert Schmidtke
Hi everyone, I'm using Flink 0.10.2 for some benchmarks and had to add some small changes to Flink, which led me to compiling and running it myself. This is when I noticed a performance difference in the pre-packaged Flink version that I downloaded from the web (

Flink Job History Dump

2016-04-05 Thread Robert Schmidtke
Hi everyone, I'm using Flink 0.10.2 to run some benchmarks on my cluster and I would like to compare it to Spark 1.6.0. Spark has an eventLog property that I can use to have the history written to HDFS, and then later view it offline on the History Server. Does Flink have a similar Feature,

Re: writeAsCsv

2015-10-07 Thread Robert Schmidtke
Hi, as far as I know only collect, print and execute actually trigger the execution. What you're missing is env.execute() after the writeAsCsv call. Hope this helps. On Wed, Oct 7, 2015 at 9:35 PM, Lydia Ickler wrote: > Hi, > > stupid question: Why is this not saved to

Re: JM/TM startup time

2015-10-02 Thread Robert Schmidtke
ch > makes it quite slow. > > If the JVM starts with a large heap, it should actually not take as long > as in your case... > > On Fri, Oct 2, 2015 at 5:26 PM, Robert Schmidtke <ro.schmid...@gmail.com> > wrote: > >> Hi everyone, >> >> I'm wond

JM/TM startup time

2015-10-02 Thread Robert Schmidtke
Hi everyone, I'm wondering about the startup times of the TMs: ... 17:03:33,255 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager actor 17:03:33,262 INFO org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig [server address:

Re: JM/TM startup time

2015-10-02 Thread Robert Schmidtke
Looking into the logs of each TM it only took about 5 seconds per TM to go from "Trying to register" to "Successful registration". On Fri, Oct 2, 2015 at 5:50 PM, Robert Schmidtke <ro.schmid...@gmail.com> wrote: > I recently switched from running Flink on YARN to run

Re: JM/TM startup time

2015-10-02 Thread Robert Schmidtke
ke too much time). What > configuration did you use for the task managers? Do you really have > that much memory or is your system swapping? > > I think the JobManager just appears to take a long time because the > TaskManagers register late. > > Cheers, > Max > > On

Re: JM/TM startup time

2015-10-02 Thread Robert Schmidtke
servation that it takes so long, or has it always taken so > long? > > On Fri, Oct 2, 2015 at 5:40 PM, Robert Schmidtke <ro.schmid...@gmail.com> > wrote: > >> I figured the JM would be waiting for the TMs. Each of my nodes has 64G >> of memory available. >> >>

Re: All but one TMs connect when JM has more than 16G of memory

2015-10-01 Thread Robert Schmidtke
So for anyone who is interested, here are some code references for getting started with Flink on Slurm. I added basic start and stop scripts for Flink on Slurm in my fork: https://github.com/robert-schmidtke/flink/tree/flink-slurm/flink-dist/src/main/flink-bin/bin And I also created an example

Re: All but one TMs connect when JM has more than 16G of memory

2015-10-01 Thread Robert Schmidtke
ink services > in your cluster. YARN is designed to be an abstraction between the cluster > and the application, that's why its a bit difficult to schedule the > containers to specific machines. > > Robert > > > > On Thu, Oct 1, 2015 at 11:24 AM, Robert Schmidtke <ro.schmi

Re: All but one TMs connect when JM has more than 16G of memory

2015-10-01 Thread Robert Schmidtke
ession/job does not fit onto the cluster. > This "endless loop" exists because in many production environments Flink > can just wait for resources to become available, for example when other > containers are finishing. > > > Robert > > On Wed, Sep 30, 2015 at 6:

Fwd: OutOfMemoryError in netty local transport

2015-09-30 Thread Robert Schmidtke
Hi everyone, I'm constantly running into OutOfMemoryErrors and for the life of me I cannot figure out what's wrong. Let me describe my setup. I'm running the current master branch of Flink on YARN (Hadoop 2.7.0). My job is an unfinished implementation of TPC-H Q2 ( https://github.com/robert

Re: All but one TMs connect when JM has more than 16G of memory

2015-09-30 Thread Robert Schmidtke
nager logs? Maybe there is a log message > which explains why the container request of Flink's AM is not fulfilled. > > > [1] > http://search-hadoop.com/m/AsBtCilK5r1pKLjf1=Re+QUESTION+Allocating+a+full+YARN+cluster > > On Wed, Sep 30, 2015 at 5:02 PM, Robert Schmidtke <ro.schm

All but one TMs connect when JM has more than 16G of memory

2015-09-30 Thread Robert Schmidtke
It's me again. This is a strange issue, I hope I managed to find the right keywords. I got 8 machines, 1 for the JM, the other 7 are TMs with 64G of memory each. When running my job like so: $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 40960 -yn 7 . The job completes without

Re: All but one TMs connect when JM has more than 16G of memory

2015-09-30 Thread Robert Schmidtke
I should say I'm running the current Flink master branch. On Wed, Sep 30, 2015 at 5:02 PM, Robert Schmidtke <ro.schmid...@gmail.com> wrote: > It's me again. This is a strange issue, I hope I managed to find the right > keywords. I got 8 machines, 1 for the JM, the other 7 are

Re: DelimitedInputFormat reads entire buffer when splitLength is 0

2015-07-13 Thread Robert Schmidtke
a double meaning, but it goes haywire in this case. Let me try to come with a fix for this... Greetings, Stephan On Fri, Jul 10, 2015 at 6:05 PM, Robert Schmidtke ro.schmid...@gmail.com wrote: Hey everyone, I just noticed that when processing input splits from a DelimitedInputFormat

DelimitedInputFormat reads entire buffer when splitLength is 0

2015-07-10 Thread Robert Schmidtke
Hey everyone, I just noticed that when processing input splits from a DelimitedInputFormat (specifically, I have a text file with words in it), that if the splitLength is 0, the entire readbuffer is filled (see

Hostname resolution error impacting data local computing

2015-07-09 Thread Robert Schmidtke
Hi everyone, I'm currently testing data local computing of Flink on XtreemFS (I'm one of the developers). We have implemented our adapter using the hadoop FileSystem interface and all works well. However upon closer inspection, I found that only remote splits are assigned, which is strange, as

Re: Hostname resolution error impacting data local computing

2015-07-09 Thread Robert Schmidtke
Hi, I dug deeply into Java source code, and it comes down to a native call to getByHostAddr, for which I only found C implementations for Windows and Solaris. Frankly, I don't know what's going on on our Linux machines here, deep down there will be a call to getnameinfo I presume. I could not yet