Re: Exception: Insufficient number of network buffers: required 120, but only 2 of 2048 available

2015-02-18 Thread Fabian Hueske
Hi Yiannis, if you scale Flink to larger setups you need to adapt the number of network buffers. The background section of the configuration reference explains the details on that [1]. Let us know, if that helped to solve the problem. Best, Fabian [1]

Re: Exception in Simple Job: Thread 'SortMerger spilling thread' terminated due to an exception: The user-defined combiner failed in its 'open()' method

2015-02-17 Thread Fabian Hueske
Hi, you are doing everything correct. This is a bug in the Flink runtime. I created a JIRA (https://issues.apache.org/jira/browse/FLINK-1574) and will push a fix later this evening once all tests have passed. Thanks for reporting the issue! Cheers, Fabian 2015-02-17 18:00 GMT+01:00 Yiannis

Re: OutOfMemory during serialization

2015-02-20 Thread Fabian Hueske
Have you tried to increase the heap size by shrinking the TM-managed memory? Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM memory (taskmanager.memory.size) in the flink-config.yaml [1]. Cheers, Fabian [1] http://flink.apache.org/docs/0.8/config.html On 20 Feb

Re: Does Flink use all of the available memory?

2015-02-14 Thread Fabian Hueske
Hi, Flink uses only the memory which is configured to the JobManager and TaskManager JVMs. By default this is 256MB for the JM and 512MB for the TM (see [1] for details). The TM memory is split into equally large chunks for each configured slot of the TM. You should definitely configure the TM

Re: [Exception]Key expressions are only supported on POJO types and Tuples

2015-02-11 Thread Fabian Hueske
In case you want to contribute or follow the discussion, here's the JIRA: https://issues.apache.org/jira/browse/FLINK-1511 Again, thanks for reporting! 2015-02-11 9:59 GMT+01:00 Fabian Hueske fhue...@gmail.com: Hi, you are right, there is a problem. I reproduced the problem and it seems

Re: Some questions about Join

2015-02-21 Thread Fabian Hueske
Hi, non-equi joins are only supported by building the cross product. This is essentially the nested-loop join strategy, that a conventional database system would chose. However, such joins are prohibitively expensive when applied to large data sets. If you have one small and another large data

Re: ApacheCon 2015 is coming to Austin, Texas, USA

2015-03-26 Thread Fabian Hueske
. - Henry On Wed, Mar 25, 2015 at 3:12 AM, Fabian Hueske fhue...@gmail.com wrote: Thanks Henry for sharing! I will be in Austin and give a talk on Flink [1]. Just ping me if you'd like to meet and chat :-) Cheers, Fabian [1] http://sched.co/2P9s 2015-03-25 1:11 GMT+01:00

Re: HBase TableOutputFormat

2015-03-23 Thread Fabian Hueske
Creating a JIRA issue never hurts. Have you tried to add your code snippet to the HadoopOutputFormatBase.configure() method? Seems to me the right place for it. Do you want to open a PR for that? 2015-03-23 16:01 GMT+01:00 Flavio Pompermaier pomperma...@okkam.it: Any news about this? Could

Re: Community vote for Hadoop Summit result

2015-01-29 Thread Fabian Hueske
Congrats Marton and everybody who is working on Flink Streaming! Looking forward to the blog post :-) 2015-01-30 1:06 GMT+01:00 Márton Balassi mbala...@apache.org: Hi everyone, Thanks for your support for the Flink talks at the community choice for the next Hadoop Summit Europe. The results

Re: Tuples serialization

2015-04-23 Thread Fabian Hueske
Have you tried the TypeSerializerOutputFormat? This will serialize data using Flink's own serializers and write it to binary files. The data can be read back using the TypeSerializerInputFormat. Cheers, Fabian 2015-04-23 11:14 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: Hi to all, in

Re: Distinct lines in a file

2015-04-30 Thread Fabian Hueske
Hi Flavio, I agree, distinct() is a bit limited right now and in fact, there is no good reason for that except nobody found time to improve it. You can use distinct(KeySelector k) to work directly on DataSetString but that's not very convenient either: DataSetString strings =

Re: Difference between using a global variable and broadcasting a variable

2015-04-27 Thread Fabian Hueske
You should also be aware that the value of a static variable is only accessible within the same JVM. Flink is a distributed system and runs in multiple JVMs. So if you set a value in one JVM it is not visible in another JVM (on a different node). In general, I would avoid to use static variables

Re: filter().project() vs flatMap()

2015-05-04 Thread Fabian Hueske
to know that the output tuples is = the number of input tuples? Is there any optimization that Flink can apply to the pipeline? On Mon, May 4, 2015 at 2:49 PM, Fabian Hueske fhue...@gmail.com wrote: It should not make a difference. I think its just personal taste. If your filter condition

Re: EOF reading file from HDFS

2015-05-08 Thread Fabian Hueske
:02 PM, Fabian Hueske fhue...@gmail.com wrote: The value of the parameter is not important for correctness but it must be the same when writing and reading. Try setting it to 64 MB. 2015-05-08 17:52 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: How can I retrieve the right one..?I I

Re: KryoException with joda Datetime

2015-05-15 Thread Fabian Hueske
Is there a chance that the version of JodaTime changed? 2015-05-15 10:22 GMT+02:00 Robert Metzger rmetz...@apache.org: Can you share the Flink program? Or at least the definition of the Tuple? I'll look into this issue in a few minutes. On Fri, May 15, 2015 at 10:13 AM, Flavio Pompermaier

Re: Parallelism question

2015-04-16 Thread Fabian Hueske
Hi Giacomo, as Max said, you can sort the data within a partition. However, data across partitions is not sorted. It is either random partitioned or hash-partitioned (all records that share some property are in the same partition). Producing fully ordered output, where the first partition has

Re: Left outer join

2015-04-17 Thread Fabian Hueske
If you know that the group cardinality of one input is always 1 (or 0) you can make that input the one to cache in memory and stream the other input with potentially more group elements. 2015-04-17 4:09 GMT-05:00 Flavio Pompermaier pomperma...@okkam.it: That would be very helpful... Thanks

Re: Informing the runtime about data already repartitioned using output contracts

2015-05-19 Thread Fabian Hueske
to either hack the code or add the missing functionality in order to realize the above described goal. Suggestions are welcome! Regards, Alex 2015-05-18 17:42 GMT+02:00 Fabian Hueske fhue...@gmail.com: Hi Mustafa, I'm afraid, this is not possible. Although you can annotate DataSources

Re: Reading from HBase problem

2015-06-09 Thread Fabian Hueske
mio. rows. Best Regards, Hilmi Am 09.06.2015 um 10:47 schrieb Fabian Hueske: OK, so the problem seems to be with the HBase InputFormat. I guess this issue needs a bit of debugging. We need to check if records are emitted twice (or more often) and if that is the case which records

Re: Reading from HBase problem

2015-06-09 Thread Fabian Hueske
. Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio.. This means that the HbaseInput reads more rows than the HBase contains. Best Regards, Hilmi Am 08.06.2015 um 23:29 schrieb Fabian Hueske: Hi Hilmi, I see two possible reasons: 1) The data source

Re: Load balancing

2015-06-09 Thread Fabian Hueske
Hi Sebastian, I agree, shuffling only specific elements would be a very useful feature, but unfortunately it's not supported (yet). Would you like to open a JIRA for that? Cheers, Fabian 2015-06-09 17:22 GMT+02:00 Kruse, Sebastian sebastian.kr...@hpi.de: Hi folks, I would like to do some

Re: Apache Flink transactions

2015-06-10 Thread Fabian Hueske
Actually, I want to know more info about Flink SQL and Flink performance Here is the Spark benchmark. Maybe you already saw it before. https://amplab.cs.berkeley.edu/benchmark/ Thanks. Best regards Hawin On Fri, Jun 5, 2015 at 1:35 AM, Fabian Hueske fhue...@gmail.com

Re: coGroup Iterator NoSuchElement

2015-06-04 Thread Fabian Hueske
datasets on the common key, so it will be normal to have many tuples on side, which does not exist on the other side .. How to fix that ?!! On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske fhue...@gmail.com wrote: Hi, one of the iterables of a CoGroup function can be empty. Calling

Re: Monitoring memory usage of a Flink Job

2015-06-15 Thread Fabian Hueske
Hi Tamara, what kind of information do you need? Something like, size and usage of in-memory sort buffers or hash tables? Some information might written in DEBUG logs, but I'm not sure about that. Besides logs, I doubt that Flink monitors memory usage. Cheers, Fabian 2015-06-15 14:34 GMT+02:00

Re: sorting groups

2015-06-16 Thread Fabian Hueske
Hi, the error is related to the way you specify the grouping and the sorting key. The API is currently restricted in the way, that you can only use a key selector function for the sorting key if you also used a selector function for the grouping key. In Scala the use of key selector functions is

Re: passing variable to filter function

2015-06-16 Thread Fabian Hueske
Hi, which version of Flink are you working with? The master (0.9-SNAPSHOT) has a RichFilterFunction [1]. Best, Fabian [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java 2015-06-16 23:52 GMT+02:00 Vinh June

Re: Reading separate files in parallel tasks as input

2015-06-14 Thread Fabian Hueske
Hi, reading local files in a distributed setting is a tricky thing because Flink assumes that all InputSplits can be read from all TaskManagers. This is obviously not possible if files are located on the local file systems different physical machines. Hence, you cannot use one of the provided

Re: sorting groups

2015-06-18 Thread Fabian Hueske
sortgroup operator to the grouoed dataset Is there a way to solve this? I think i don't understand what a keySelector is Thanks! Michele -- *Da:* Fabian Hueske fhue...@gmail.com *Inviato:* martedì 16 giugno 2015 23.43.03 *A:* user@flink.apache.org *Oggetto:* Re

Re: Job Statistics

2015-06-18 Thread Fabian Hueske
Hi Jean, what kind of job execution stats are you interested in? Cheers, Fabian 2015-06-18 9:01 GMT+02:00 Matthias J. Sax mj...@informatik.hu-berlin.de: Hi, the CLI cannot show any job statistics. However, you can use the JobManager web interface that is accessible at port 8081 from a

Re: Execution graph

2015-06-30 Thread Fabian Hueske
As an addition, some operators can only be run with a parallelism of 1. For example data sources based on collections and (un-grouped) all reduces. In some cases, the parallelism of the following operators will as well be set to 1 to avoid a network shuffle. If you do:

Re: Datasets union CompilerException

2015-06-30 Thread Fabian Hueske
Also, can you open a JIRA for the issue? Otherwise it might get lost on the mailing list. Thanks you! 2015-06-30 10:56 GMT+02:00 Fabian Hueske fhue...@gmail.com: Hi, is it possible to get a smaller version of that program that reproduces the bug or give a few more details about the structure

Re: Datasets union CompilerException

2015-06-30 Thread Fabian Hueske
Hi, is it possible to get a smaller version of that program that reproduces the bug or give a few more details about the structure of the job? Without any hints, it is very hard to reproduce and fix the bug. 2015-06-24 18:23 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: Unfortunately not

Re: POJO coCroup on null value

2015-07-06 Thread Fabian Hueske
In fact you can implement own composite data types (like Tuple, Pojo) that can deal with nullable fields as keys but you need custom serializers and comparators for that. These types won't be as efficient as types that cannot handle null fields. Cheers, Fabian 2015-07-02 20:17 GMT+02:00 Flavio

Re: how differences between hadoop and apache flink?

2015-05-22 Thread Fabian Hueske
I'm quoting Chiwan Parks answer to this question: Hadoop is a framework for reliable, scalable, distributed computing. So, there are many components for this purpose such as HDFS, YARN and Hadoop MapReduce. Flink is an alternative to Hadoop MapReduce component. It has also some tools to make

Re: k means - waiting for dataset

2015-05-22 Thread Fabian Hueske
There are two ways to do that: 1) You use a GroupReduceFunction, which gives you an iterator over all points similar to Hadoop's ReduceFunction. 2) You use the ReduceFunction to compute the sum and the count at the same time (e.g., in two fields of a Tuple2) and use a MapFunction to do the final

Re: Write File on specific machine

2015-05-22 Thread Fabian Hueske
A work around might be to set the parallelism to 1 as Till suggested and write into a network-shared directory. Nonetheless, it sounds like a good feature to support strict assignments of data sink tasks to machines. Fabian 2015-05-22 16:10 GMT+02:00 Hilmi Yildirim hilmi.yildi...@neofonie.de:

Re: Visibility of FileInputFormat constants

2015-05-26 Thread Fabian Hueske
Definitely! Much better than using the String value. 2015-05-26 16:55 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: Hi to all, in my program I need to set recursive.file.enumeration to true and I discovered that there's a constant for that variable in FileInputFormat

Re: Visibility of FileInputFormat constants

2015-05-26 Thread Fabian Hueske
Done 2015-05-26 16:59 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: Could you do that so I can avoid to make a PR just for that, please? On Tue, May 26, 2015 at 4:58 PM, Fabian Hueske fhue...@gmail.com wrote: Definitely! Much better than using the String value. 2015-05-26 16:55 GMT

Re: 答复: How to understand slot?

2015-08-18 Thread Fabian Hueske
A TM reserves a certain amount of memory for each slot, but CPU and IO can be shared across slots. Hence, there might be some imbalance among TMs, but this imbalance is limited by the concept of slots which gives an upper bound of the number of tasks that a TM can process. Also random assignment

Re: Gelly vertex ID type requirements?

2015-07-30 Thread Fabian Hueske
Thanks for reporting this issue. The Wrong field type error looks like a bug to me. This happens, because PojoType is neither a TupleType nor an AtomicType. To me it looks like the TupleTypeInfoBase condition should be generalized to CompositeType. I will look into this. Cheers, Fabian

Re: thread model issue in TaskManager

2015-07-30 Thread Fabian Hueske
Hi, it is currently not possible to isolate tasks that consume a lot of JVM heap memory and schedule them to a specific slot (or TaskManager). If you operate in a YARN setup, you can isolate different jobs from each other by starting a new YARN session for each job, but tasks within the same job

Re: Invalid argument reading a file containing a Kryo object

2015-08-09 Thread Fabian Hueske
; // read only one object return yourObject; } } 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: Sorry Fabian but I don't understand what I should do :( Could you provide me a simple snippet of code to achieve this? On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske fhue

Re: Invalid argument reading a file containing a Kryo object

2015-08-07 Thread Fabian Hueske
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1. Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink). 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier

Re: Invalid argument reading a file containing a Kryo object

2015-08-07 Thread Fabian Hueske
the same error :( Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right? On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske fhue...@gmail.com wrote: If you create your file by just sequentially writing all objects to the file using Kryo, you can

Re: Invalid argument reading a file containing a Kryo object

2015-08-07 Thread Fabian Hueske
the serialized object is 7 bytes On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske fhue...@gmail.com wrote: This might be an issue with the blockSize parameter of the BinaryInputFormat. How large is the file with the single object? 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it

Re: Invalid argument reading a file containing a Kryo object

2015-08-07 Thread Fabian Hueske
12:08 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: Should this be the case just reading recursively an entire directory containing one object per file? On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske fhue...@gmail.com wrote: You could implement your own InputFormat based

Re: Invalid argument reading a file containing a Kryo object

2015-08-07 Thread Fabian Hueske
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file. 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: So what should I do? On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske fhue

Re: Udf Performance and Object Creation

2015-08-14 Thread Fabian Hueske
I think Timo answered both questions (quoting Michael: Hey Timo, yes that is what I needed to know. Thanks). Maybe one more comment. The motivation of the examples is not the best performance but to showcase Flink's APIs and concepts. Best, Fabian 2015-08-14 17:43 GMT+02:00 Flavio Pompermaier

Re: Udf Performance and Object Creation

2015-08-14 Thread Fabian Hueske
O sorry, Flavio! I didn't see Hawins questions :-( Thanks Stephan for picking up! 2015-08-14 17:43 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: Any insight about these 2 questions..? On 12 Aug 2015 17:38, Flavio Pompermaier pomperma...@okkam.it wrote: This is something I've never

Re: Invalid argument reading a file containing a Kryo object

2015-08-10 Thread Fabian Hueske
Thanks a lot! 2015-08-10 12:20 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it: Done through https://issues.apache.org/jira/browse/FLINK-2503 Thanks again, Flavio On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske fhue...@gmail.com wrote: Congrats that you got your InputFormat working

Re: Yarn configuration

2015-07-27 Thread Fabian Hueske
Hi Michele, the 10506 MB refer to the size of Flink's managed memory whereas the 20992 MB refer to the total amount of TM memory. At start-up, the TM allocates a fraction of the JVM memory as byte arrays and manages this portion by itself. The remaining memory is used as regular JVM heap for TM

Re: HDFS directory rename

2015-07-22 Thread Fabian Hueske
listStatus() should return an empty array On Jul 22, 2015 13:11, Flavio Pompermaier pomperma...@okkam.it wrote: I can detect if it's a dir but how can I detect if it's empty? On Wed, Jul 22, 2015 at 12:49 PM, Fabian Hueske fhue...@gmail.com wrote: How about FileStatus[] FileSystem.listStatus

Re: loop break operation

2015-07-20 Thread Fabian Hueske
Use a broadcastset to distribute the old centers to a map which has the new centers as regular input. Put the old centers in a hashmap in open() and check the distance to the new centers in map(). On Jul 20, 2015 12:55 PM, Pa Rö paul.roewer1...@googlemail.com wrote: okay, i have found it. how to

Re: Order groups by their keys

2015-07-15 Thread Fabian Hueske
Yes, going to parallelism 1 is another option but you don't have to use a fake-reduce to enforce sorting. You can simply do: DataSetTuple3Integer, String, String result = ... result .sortPartition(1, Order.ASCENDING).setParallelism(1) // sort on first String field .output(...); Fabian

Re: Reading multiple datasets with one read operation

2015-10-22 Thread Fabian Hueske
It might even be materialized (to disk) if both derived data sets are joined. 2015-10-22 12:01 GMT+02:00 Till Rohrmann : > I fear that the filter operations are not chained because there are at > least two of them which have the same DataSet as input. However, it's true >

Re: Reading multiple datasets with one read operation

2015-10-22 Thread Fabian Hueske
proach would still be faster than > reading the entire input multiple times (we are talking 100GB+ on max 32 > workers) but I would have to run some experiments to confirm that. > > > > 2015-10-22 12:06 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> It might even

Re: How best to deal with wide, structured tuples?

2015-10-29 Thread Fabian Hueske
Hi Johann, I see three options for your use case. 1) Generate Pojo code at planning time, i.e., when the program is composed. This does not work when the program is already running. The benefit is that you can use key expressions, have typed fields, and type specific serializers and comparators.

Re: Flink on EC"

2015-10-29 Thread Fabian Hueske
Hi Thomas, until recently, Flink provided an own implementation of a S3FileSystem which wasn't fully tested and buggy. We removed that implementation and are using now (in 0.10-SNAPSHOT) Hadoop's S3 implementation by default. If you want to continue using 0.9.1 you can configure Flink to use

Re: Implementing samza table/stream join

2015-11-10 Thread Fabian Hueske
Hi Nick, I think you can do this with Flink quite similar to how it is explained in the Samza documentation by using a stateful CoFlatMapFunction [1], [2]. Please have a look at this snippet [3]. This code implements an updateable stream filter. The first stream is filtered by words from the

Apache Flink 0.10.0 released

2015-11-16 Thread Fabian Hueske
Hi everybody, The Flink community is excited to announce that Apache Flink 0.10.0 has been released. Please find the release announcement here: --> http://flink.apache.org/news/2015/11/16/release-0.10.0.html Best, Fabian

Re: Mixing POJO and Tuples

2015-11-10 Thread Fabian Hueske
Hi Flavio, this will not work out of the box. If you extend a Flink tuple and add additional fields, the type will be recognized as tuple and the TupleSerializer will be used to serialize and deserialize the record. Since the TupleSerializer is not aware of your additional fields it will not

Re: Rich vs normal functions

2015-11-09 Thread Fabian Hueske
These reason is that the non-rich function interfaces are SAM (single abstract method) interfaces. In Java 8, SAM interfaces can be specified as concise lambda functions. Cheers, Fabian 2015-11-09 10:45 GMT+01:00 Flavio Pompermaier : > Hi flinkers, > I have a simple

Re: Long ids from String

2015-11-03 Thread Fabian Hueske
Converting String ids into Long ids can be quite expensive, so you should make sure it pays off. The save way to do it is to get all unique String ids (project, distinct), do zipWithUniqueId, and join all DataSets that have the String id with the new long id. So it is a full sort for the unique

Re: Create triggers

2015-10-30 Thread Fabian Hueske
You refer to the DataSet (batch) API, right? In that case you can evaluate your condition in the program and fetch a DataSet back to the client using List myData = DataSet.collect(); Based on the result of the collect() call you can define and execute a new program. Note: collect() will

Re: Create triggers

2015-11-02 Thread Fabian Hueske
and execute operations on C dataset. > > Some pseudocode from your solution: > DataSet A = env.readFile(...); > DataSet C = env.readFile(...); > > A.groupBy().reduce().filter(*Check conditions here and in case start > processing C*); > > > Thanks, > Giacomo > > >

Re: Powered by Flink

2015-10-19 Thread Fabian Hueske
Thanks for starting this Kostas. I think the list is quite hidden in the wiki. Should we link from flink.apache.org to that page? Cheers, Fabian 2015-10-19 14:50 GMT+02:00 Kostas Tzoumas : > Hi everyone, > > I started a "Powered by Flink" wiki page, listing some of the >

Re: Powered by Flink

2015-10-19 Thread Fabian Hueske
Sounds good +1 2015-10-19 14:57 GMT+02:00 Márton Balassi <balassi.mar...@gmail.com>: > Thanks for starting and big +1 for making it more prominent. > > On Mon, Oct 19, 2015 at 2:53 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Thanks for starting this Kostas. &

Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

2015-10-19 Thread Fabian Hueske
Hi Philip, here a few additions to what Max said: - ORDER BY: As Max said, Flink's sortPartition() does only sort with a partition and does not produce a total order. You can either set the parallelism to 1 as Max suggested or use a custom partitioner to range partition the data. - SORT BY: From

Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

2015-10-19 Thread Fabian Hueske
e combination with [Distribute > By] + [Sort By]. Therefore, according to your suggestion, should it be > partitionByHash() + sortGroup() instead of sortPartition() ? > > Or probably I did not still get much difference between Partition and > scope within a reduce. > > Regards, &

Re: Powered by Flink

2015-10-19 Thread Fabian Hueske
s a question difficult to answer to > interested users. > > > On 19.10.2015 15:08, Suneel Marthi wrote: > > +1 to this. > > On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Sounds good +1 >> >> 2015-10-19 14:57 GMT+02:0

Re: ExecutionEnvironment setConfiguration API

2015-10-19 Thread Fabian Hueske
I think it's not a nice solution to check for the type of the returned execution environment to determine whether it is a local or a remote execution environment. Wouldn't it be better to add a method isLocal() to ExecutionEnvironment? Cheers, Fabian 2015-10-14 19:14 GMT+02:00 Flavio

Re: Debug OutOfMemory

2015-10-08 Thread Fabian Hueske
Hi Konstantin, Flink uses managed memory only for its internal processing (sorting, hash tables, etc.). If you allocate too much memory in your user code, it can still fail with an OOME. This can also happen for large broadcast sets. Can you check how much much memory the JVM allocated and how

Re: Parallel file read in LocalEnvironment

2015-10-07 Thread Fabian Hueske
Hi Flavio, it is not possible to split by line count because that would mean to read and parse the file just for splitting. Parallel processing of data sources depends on the input splits created by the InputFormat. Local files can be split just like files in HDFS. Usually, each file corresponds

Re: TeraSort on Flink and Spark

2015-07-10 Thread Fabian Hueske
Hi Dongwon Kim, this blog post describes Flink's memory management, serialization, and sort algorithm and also includes performance numbers of some microbenchmarks. -- http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html The difference between Text and OptimizedText, is

Re: Json filter

2015-07-07 Thread Fabian Hueske
Hi Luca, parsing JSON can be tricky if your schema is nested. In case of a flat schema (as yours), you can read the JSON records like this: ExecutionEnvironment env = ... DataSetString jsonRaw = env.readFileOfPrimitives(path, },, String.class); // }, is a sequence that uniquely delimits your

Re: Fold vs Reduce in DataStream API

2015-11-18 Thread Fabian Hueske
Hi Ron, Have you checked: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#transformations ? Fold is like reduce, except that you define a start element (of a different type than the input type) and the result type is the type of the initial value. In

Re: How to force the parallelism on small streams?

2015-09-03 Thread Fabian Hueske
In case of rebalance(), all sources start the round-robin partitioning at index 0. Since each source emits only very few elements, only the first 15 mappers receive any input. It would be better to let each source start the round-robin partitioning at a different index, something like startIdx =

Re: How to force the parallelism on small streams?

2015-09-03 Thread Fabian Hueske
wever for shuffle() I would expect that each source task uses a > different shuffle pattern... > > -Matthias > > On 09/03/2015 03:28 PM, Fabian Hueske wrote: > > In case of rebalance(), all sources start the round-robin partitioning at > > index 0. Since each source e

Re: what different between join and coGroup in flink

2015-09-03 Thread Fabian Hueske
CoGroup is more generic than Join. You can perform a Join with CoGroup but not do a CoGroup with a Join. However, Join can be executed more efficiently than CoGroup. 2015-09-03 22:28 GMT+02:00 hagersaleh : > what different between join and coGroup in flink > > > > > -- >

Re: Union/append performance question

2015-09-07 Thread Fabian Hueske
.it>: > ok thanks. are there any alternatives to that?may I use accumulators for > that? > On 7 Sep 2015 17:47, "Fabian Hueske" <fhue...@gmail.com> wrote: > >> If the loop count of 3 is fixed (or not significantly larger), union >> should be fine. >> >&g

Re: Union/append performance question

2015-09-07 Thread Fabian Hueske
Hi Flavio, your example does not contain a union. Union itself basically comes for free. However, if you have a lot of small DataSet that you want to union, the plan can become very complex and might cause overhead due to scheduling many small tasks. For example, it is usually better to have one

Re: Union/append performance question

2015-09-07 Thread Fabian Hueske
In that case you should go with union. 2015-09-07 19:06 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > 3 or 4 usually.. > On 7 Sep 2015 18:39, "Fabian Hueske" <fhue...@gmail.com> wrote: > >> And how many unions would your program use if you would

Re: Union/append performance question

2015-09-07 Thread Fabian Hueske
And how many unions would your program use if you would follow the union-in-loop approach? 2015-09-07 18:31 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > In the order of 10 GB.. > > On Mon, Sep 7, 2015 at 6:14 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >

Re: output writer

2015-09-08 Thread Fabian Hueske
; the LRU in each node) > > and if I create the LRU in the fileoutputstream, how many of them will be > created? one for each ‘degree of parallelism’ right? > > > thanks > michele > > > Il giorno 08/set/2015, alle ore 16:49, Fabian Hueske <fhue...@gmail.com> > h

Re: How to force the parallelism on small streams?

2015-09-03 Thread Fabian Hueske
Btw, it is working with a parallelism 1 source, because only a single source partitions (round-robin or random) the data. Several sources do not assign work to the same few mappers. 2015-09-03 15:22 GMT+02:00 Matthias J. Sax : > If it would be only 14 elements, you are

Re: output writer

2015-09-08 Thread Fabian Hueske
d in this cases? > > > > > Il giorno 08/set/2015, alle ore 17:55, Fabian Hueske <fhue...@gmail.com> > ha scritto: > > I think you should not extend the FileOutputFormat but implement a > completely new OutputFormat. You can of course copy some of the > FileOutput

Re: output writer

2015-09-08 Thread Fabian Hueske
> Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN > Sitz der Gesel

Re: output writer

2015-09-09 Thread Fabian Hueske
they are grouped, only one stream per slot will be open (I always > use degree of parallelism at the highest in this step) and it will be > opened only once (no append) > > > is it right? > > > > thanks a lot > michele > > > > Il giorno 09/set/2015, alle ore 1

Re: output writer

2015-09-09 Thread Fabian Hueske
t; (the output key) and save the order of key3? if it is possible > > > Il giorno 08/set/2015, alle ore 18:39, Fabian Hueske <fhue...@gmail.com> > ha scritto: > > I did not fully understand you last question, but I'll try to answer. > > If you do a > myD

Re: intermediate result reuse

2015-09-14 Thread Fabian Hueske
ing something thus stopping execution or > is it something different? > > > > > > Il giorno 14/set/2015, alle ore 22:18, Fabian Hueske <fhue...@gmail.com> > ha scritto: > > Hi Michele, > > collect on DataSet and collect on a Collector within a Function are t

Re: Issue with parallelism when CoGrouping custom nested data tpye

2015-09-16 Thread Fabian Hueske
This sound like a problem with your custom type and its (presumably) custom serializers and comparators. I assume it is not an issue of partitioning or sorting because Reduce is working fine, as you reported. CoGroup does also partition and sort data, but compares the elements of two sorted

Re: Issue with parallelism when CoGrouping custom nested data tpye

2015-09-16 Thread Fabian Hueske
ice day! > > - Pieter > > > > 2015-09-16 14:27 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> Sorry, I was thinking too complicated. Forget about the methods I >> mentioned. >> >> If you are implementing WritableComparable types, you need to ove

Re: intermediate result reuse

2015-09-12 Thread Fabian Hueske
Hi Michele, Flink programs can have multiple sinks. In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time. So in this case, there is no need to materialize the intermediate result a. If you call

Re: Distribute DataSet to subset of nodes

2015-09-15 Thread Fabian Hueske
t; > Can you answer my question from above? If the setParallelism-method works > and selects five nodes for the first flatMap and five _other_ nodes for the > second flatMap, then that would be fine for me if there is no other easy > solution. > > Thanks for your help! > Best > Stefan > >

Re: Distribute DataSet to subset of nodes

2015-09-17 Thread Fabian Hueske
, but I guess there is > some heuristic inside which decides how to distribute.In the normal setup > that all 10 nodes are up, connection is good, all nodes have the same > resources available, input data is evenly distributed in HDFS, then the > default case should be to distribute to all

RE: Flink HA mode

2015-09-09 Thread Fabian Hueske
Hi Emmanuel, yes Master HA is currently under development and only available in 0.10 snapshot. AFAIK, it is almost but not completely done yet. Best, Fabian On Sep 10, 2015 01:29, "Emmanuel" wrote: > is this a 0.10 snapshot feature only? I'm using 0.9.1 right now > > >

Re: Distribute DataSet to subset of nodes

2015-09-14 Thread Fabian Hueske
Hi Stefan, I agree with Sachin's approach. That should be the easiest solution and would look like: env.setParallelism(10); // default is 10 DataSet data = env.read(...) // large data set DataSet smallData1 = env.read(...) // read first part of small data DataSet smallData2 = env.read(...) //

Re: For each element in a dataset, do something with another dataset

2015-09-30 Thread Fabian Hueske
Hi Pieter, cross is indeed too expensive for this task. If dataset A fits into memory, you can do the following: Use a RichMapPartitionFunction to process dataset B and add dataset A as a broadcastSet. In the open method of mapPartition, you can load the broadcasted set and sort it by

Re: Apache Flink streaming features summary

2015-09-28 Thread Fabian Hueske
Can you share a link to the comparison? Thanks, Fabian 2015-09-28 9:32 GMT+02:00 Flavio Pompermaier : > Hi to all, > > I saw this comparison vs Spark, Storm and Apache Apex. > Since I didn't have time to look at the streaming part of Apache Flink, it > could be interesting

Re: Distribute DataSet to subset of nodes

2015-09-21 Thread Fabian Hueske
ady available on the node and just needs to be exposed > somehow, right? > > Cheers > Stefan > > > > On 17 September 2015 at 18:39, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Stefan, >> >> I think I have a solution for your problem :-) &g

  1   2   3   4   5   6   7   8   9   10   >