Hi Yiannis,
if you scale Flink to larger setups you need to adapt the number of network
buffers.
The background section of the configuration reference explains the details
on that [1].
Let us know, if that helped to solve the problem.
Best, Fabian
[1]
Hi,
you are doing everything correct.
This is a bug in the Flink runtime.
I created a JIRA (https://issues.apache.org/jira/browse/FLINK-1574) and
will push a fix later this evening once all tests have passed.
Thanks for reporting the issue!
Cheers, Fabian
2015-02-17 18:00 GMT+01:00 Yiannis
Have you tried to increase the heap size by shrinking the TM-managed memory?
Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM
memory (taskmanager.memory.size) in the flink-config.yaml [1].
Cheers, Fabian
[1] http://flink.apache.org/docs/0.8/config.html
On 20 Feb
Hi,
Flink uses only the memory which is configured to the JobManager and
TaskManager JVMs.
By default this is 256MB for the JM and 512MB for the TM (see [1] for
details).
The TM memory is split into equally large chunks for each configured slot
of the TM.
You should definitely configure the TM
In case you want to contribute or follow the discussion, here's the JIRA:
https://issues.apache.org/jira/browse/FLINK-1511
Again, thanks for reporting!
2015-02-11 9:59 GMT+01:00 Fabian Hueske fhue...@gmail.com:
Hi,
you are right, there is a problem. I reproduced the problem and it seems
Hi,
non-equi joins are only supported by building the cross product.
This is essentially the nested-loop join strategy, that a conventional
database system would chose. However, such joins are prohibitively
expensive when applied to large data sets.
If you have one small and another large data
.
- Henry
On Wed, Mar 25, 2015 at 3:12 AM, Fabian Hueske fhue...@gmail.com
wrote:
Thanks Henry for sharing!
I will be in Austin and give a talk on Flink [1].
Just ping me if you'd like to meet and chat :-)
Cheers, Fabian
[1] http://sched.co/2P9s
2015-03-25 1:11 GMT+01:00
Creating a JIRA issue never hurts.
Have you tried to add your code snippet to the
HadoopOutputFormatBase.configure() method? Seems to me the right place for
it.
Do you want to open a PR for that?
2015-03-23 16:01 GMT+01:00 Flavio Pompermaier pomperma...@okkam.it:
Any news about this? Could
Congrats Marton and everybody who is working on Flink Streaming!
Looking forward to the blog post :-)
2015-01-30 1:06 GMT+01:00 Márton Balassi mbala...@apache.org:
Hi everyone,
Thanks for your support for the Flink talks at the community choice for
the next Hadoop Summit Europe. The results
Have you tried the TypeSerializerOutputFormat?
This will serialize data using Flink's own serializers and write it to
binary files.
The data can be read back using the TypeSerializerInputFormat.
Cheers, Fabian
2015-04-23 11:14 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:
Hi to all,
in
Hi Flavio,
I agree, distinct() is a bit limited right now and in fact, there is no
good reason for that except nobody found time to improve it.
You can use distinct(KeySelector k) to work directly on DataSetString but
that's not very convenient either:
DataSetString strings =
You should also be aware that the value of a static variable is only
accessible within the same JVM.
Flink is a distributed system and runs in multiple JVMs. So if you set a
value in one JVM it is not visible in another JVM (on a different node).
In general, I would avoid to use static variables
to know that the output
tuples is = the number of input tuples?
Is there any optimization that Flink can apply to the pipeline?
On Mon, May 4, 2015 at 2:49 PM, Fabian Hueske fhue...@gmail.com wrote:
It should not make a difference. I think its just personal taste.
If your filter condition
:02 PM, Fabian Hueske fhue...@gmail.com wrote:
The value of the parameter is not important for correctness but it must
be the same when writing and reading.
Try setting it to 64 MB.
2015-05-08 17:52 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:
How can I retrieve the right one..?I I
Is there a chance that the version of JodaTime changed?
2015-05-15 10:22 GMT+02:00 Robert Metzger rmetz...@apache.org:
Can you share the Flink program?
Or at least the definition of the Tuple?
I'll look into this issue in a few minutes.
On Fri, May 15, 2015 at 10:13 AM, Flavio Pompermaier
Hi Giacomo,
as Max said, you can sort the data within a partition.
However, data across partitions is not sorted. It is either random
partitioned or hash-partitioned (all records that share some property are
in the same partition). Producing fully ordered output, where the first
partition has
If you know that the group cardinality of one input is always 1 (or 0) you
can make that input the one to cache in memory and stream the other input
with potentially more group elements.
2015-04-17 4:09 GMT-05:00 Flavio Pompermaier pomperma...@okkam.it:
That would be very helpful...
Thanks
to either hack the code or add the missing
functionality in order to realize the above described goal.
Suggestions are welcome!
Regards,
Alex
2015-05-18 17:42 GMT+02:00 Fabian Hueske fhue...@gmail.com:
Hi Mustafa,
I'm afraid, this is not possible.
Although you can annotate DataSources
mio. rows.
Best Regards,
Hilmi
Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
OK, so the problem seems to be with the HBase InputFormat.
I guess this issue needs a bit of debugging.
We need to check if records are emitted twice (or more often) and if that
is the case which records
.
Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102
mio.. This means that the HbaseInput reads more rows than the HBase
contains.
Best Regards,
Hilmi
Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
Hi Hilmi,
I see two possible reasons:
1) The data source
Hi Sebastian,
I agree, shuffling only specific elements would be a very useful feature,
but unfortunately it's not supported (yet).
Would you like to open a JIRA for that?
Cheers, Fabian
2015-06-09 17:22 GMT+02:00 Kruse, Sebastian sebastian.kr...@hpi.de:
Hi folks,
I would like to do some
Actually, I want to know more info about Flink SQL and Flink
performance
Here is the Spark benchmark. Maybe you already saw it before.
https://amplab.cs.berkeley.edu/benchmark/
Thanks.
Best regards
Hawin
On Fri, Jun 5, 2015 at 1:35 AM, Fabian Hueske fhue...@gmail.com
datasets on the common key, so it will be normal to have many tuples on
side, which does not exist on the other side ..
How to fix that ?!!
On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske fhue...@gmail.com wrote:
Hi,
one of the iterables of a CoGroup function can be empty. Calling
Hi Tamara,
what kind of information do you need? Something like, size and usage of
in-memory sort buffers or hash tables?
Some information might written in DEBUG logs, but I'm not sure about that.
Besides logs, I doubt that Flink monitors memory usage.
Cheers, Fabian
2015-06-15 14:34 GMT+02:00
Hi,
the error is related to the way you specify the grouping and the sorting
key.
The API is currently restricted in the way, that you can only use a key
selector function for the sorting key if you also used a selector function
for the grouping key.
In Scala the use of key selector functions is
Hi,
which version of Flink are you working with?
The master (0.9-SNAPSHOT) has a RichFilterFunction [1].
Best, Fabian
[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
2015-06-16 23:52 GMT+02:00 Vinh June
Hi,
reading local files in a distributed setting is a tricky thing because
Flink assumes that all InputSplits can be read from all TaskManagers. This
is obviously not possible if files are located on the local file systems
different physical machines. Hence, you cannot use one of the provided
sortgroup operator to the grouoed dataset
Is there a way to solve this?
I think i don't understand what a keySelector is
Thanks!
Michele
--
*Da:* Fabian Hueske fhue...@gmail.com
*Inviato:* martedì 16 giugno 2015 23.43.03
*A:* user@flink.apache.org
*Oggetto:* Re
Hi Jean,
what kind of job execution stats are you interested in?
Cheers, Fabian
2015-06-18 9:01 GMT+02:00 Matthias J. Sax mj...@informatik.hu-berlin.de:
Hi,
the CLI cannot show any job statistics. However, you can use the
JobManager web interface that is accessible at port 8081 from a
As an addition, some operators can only be run with a parallelism of 1. For
example data sources based on collections and (un-grouped) all reduces. In
some cases, the parallelism of the following operators will as well be set
to 1 to avoid a network shuffle.
If you do:
Also, can you open a JIRA for the issue? Otherwise it might get lost on the
mailing list.
Thanks you!
2015-06-30 10:56 GMT+02:00 Fabian Hueske fhue...@gmail.com:
Hi, is it possible to get a smaller version of that program that
reproduces the bug or give a few more details about the structure
Hi, is it possible to get a smaller version of that program that reproduces
the bug or give a few more details about the structure of the job?
Without any hints, it is very hard to reproduce and fix the bug.
2015-06-24 18:23 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:
Unfortunately not
In fact you can implement own composite data types (like Tuple, Pojo) that
can deal with nullable fields as keys but you need custom serializers and
comparators for that. These types won't be as efficient as types that
cannot handle null fields.
Cheers, Fabian
2015-07-02 20:17 GMT+02:00 Flavio
I'm quoting Chiwan Parks answer to this question:
Hadoop is a framework for reliable, scalable, distributed computing. So,
there are many components for this purpose such as HDFS, YARN and Hadoop
MapReduce. Flink is an alternative to Hadoop MapReduce component. It has
also some tools to make
There are two ways to do that:
1) You use a GroupReduceFunction, which gives you an iterator over all
points similar to Hadoop's ReduceFunction.
2) You use the ReduceFunction to compute the sum and the count at the same
time (e.g., in two fields of a Tuple2) and use a MapFunction to do the
final
A work around might be to set the parallelism to 1 as Till suggested and
write into a network-shared directory.
Nonetheless, it sounds like a good feature to support strict assignments of
data sink tasks to machines.
Fabian
2015-05-22 16:10 GMT+02:00 Hilmi Yildirim hilmi.yildi...@neofonie.de:
Definitely! Much better than using the String value.
2015-05-26 16:55 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:
Hi to all,
in my program I need to set recursive.file.enumeration to true and I
discovered that there's a constant for that variable in FileInputFormat
Done
2015-05-26 16:59 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:
Could you do that so I can avoid to make a PR just for that, please?
On Tue, May 26, 2015 at 4:58 PM, Fabian Hueske fhue...@gmail.com wrote:
Definitely! Much better than using the String value.
2015-05-26 16:55 GMT
A TM reserves a certain amount of memory for each slot, but CPU and IO can
be shared across slots. Hence, there might be some imbalance among TMs, but
this imbalance is limited by the concept of slots which gives an upper
bound of the number of tasks that a TM can process.
Also random assignment
Thanks for reporting this issue.
The Wrong field type error looks like a bug to me.
This happens, because PojoType is neither a TupleType nor an AtomicType. To
me it looks like the TupleTypeInfoBase condition should be generalized to
CompositeType.
I will look into this.
Cheers, Fabian
Hi,
it is currently not possible to isolate tasks that consume a lot of JVM
heap memory and schedule them to a specific slot (or TaskManager).
If you operate in a YARN setup, you can isolate different jobs from each
other by starting a new YARN session for each job, but tasks within the
same job
; // read only one object
return yourObject;
}
}
2015-08-07 14:40 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:
Sorry Fabian but I don't understand what I should do :(
Could you provide me a simple snippet of code to achieve this?
On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske fhue
If you create your file by just sequentially writing all objects to the
file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit
tricky (and not specific to Flink).
2015-08-07 11:28 GMT+02:00 Flavio Pompermaier
the same error :(
Moreover, in this example I put exactly one object per file so it should
be able to deserialize it, right?
On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske fhue...@gmail.com wrote:
If you create your file by just sequentially writing all objects to the
file using Kryo, you can
the serialized object is 7 bytes
On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske fhue...@gmail.com wrote:
This might be an issue with the blockSize parameter of the
BinaryInputFormat.
How large is the file with the single object?
2015-08-07 11:37 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it
12:08 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:
Should this be the case just reading recursively an entire directory
containing one object per file?
On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske fhue...@gmail.com wrote:
You could implement your own InputFormat based
You could implement your own InputFormat based on FileInputFormat and
overwrite the createInputSplits method to just create a single split per
file.
2015-08-07 12:02 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:
So what should I do?
On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske fhue
I think Timo answered both questions (quoting Michael: Hey Timo, yes that
is what I needed to know. Thanks).
Maybe one more comment. The motivation of the examples is not the best
performance but to showcase Flink's APIs and concepts.
Best, Fabian
2015-08-14 17:43 GMT+02:00 Flavio Pompermaier
O sorry, Flavio!
I didn't see Hawins questions :-(
Thanks Stephan for picking up!
2015-08-14 17:43 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:
Any insight about these 2 questions..?
On 12 Aug 2015 17:38, Flavio Pompermaier pomperma...@okkam.it wrote:
This is something I've never
Thanks a lot!
2015-08-10 12:20 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:
Done through https://issues.apache.org/jira/browse/FLINK-2503
Thanks again,
Flavio
On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske fhue...@gmail.com wrote:
Congrats that you got your InputFormat working
Hi Michele,
the 10506 MB refer to the size of Flink's managed memory whereas the 20992
MB refer to the total amount of TM memory. At start-up, the TM allocates a
fraction of the JVM memory as byte arrays and manages this portion by
itself. The remaining memory is used as regular JVM heap for TM
listStatus() should return an empty array
On Jul 22, 2015 13:11, Flavio Pompermaier pomperma...@okkam.it wrote:
I can detect if it's a dir but how can I detect if it's empty?
On Wed, Jul 22, 2015 at 12:49 PM, Fabian Hueske fhue...@gmail.com wrote:
How about FileStatus[] FileSystem.listStatus
Use a broadcastset to distribute the old centers to a map which has the new
centers as regular input. Put the old centers in a hashmap in open() and
check the distance to the new centers in map().
On Jul 20, 2015 12:55 PM, Pa Rö paul.roewer1...@googlemail.com wrote:
okay, i have found it. how to
Yes, going to parallelism 1 is another option but you don't have to use a
fake-reduce to enforce sorting.
You can simply do:
DataSetTuple3Integer, String, String result = ...
result
.sortPartition(1, Order.ASCENDING).setParallelism(1) // sort on first
String field
.output(...);
Fabian
It might even be materialized (to disk) if both derived data sets are
joined.
2015-10-22 12:01 GMT+02:00 Till Rohrmann :
> I fear that the filter operations are not chained because there are at
> least two of them which have the same DataSet as input. However, it's true
>
proach would still be faster than
> reading the entire input multiple times (we are talking 100GB+ on max 32
> workers) but I would have to run some experiments to confirm that.
>
>
>
> 2015-10-22 12:06 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> It might even
Hi Johann,
I see three options for your use case.
1) Generate Pojo code at planning time, i.e., when the program is composed.
This does not work when the program is already running. The benefit is that
you can use key expressions, have typed fields, and type specific
serializers and comparators.
Hi Thomas,
until recently, Flink provided an own implementation of a S3FileSystem
which wasn't fully tested and buggy.
We removed that implementation and are using now (in 0.10-SNAPSHOT)
Hadoop's S3 implementation by default.
If you want to continue using 0.9.1 you can configure Flink to use
Hi Nick,
I think you can do this with Flink quite similar to how it is explained in
the Samza documentation by using a stateful CoFlatMapFunction [1], [2].
Please have a look at this snippet [3].
This code implements an updateable stream filter. The first stream is
filtered by words from the
Hi everybody,
The Flink community is excited to announce that Apache Flink 0.10.0 has
been released.
Please find the release announcement here:
--> http://flink.apache.org/news/2015/11/16/release-0.10.0.html
Best,
Fabian
Hi Flavio,
this will not work out of the box. If you extend a Flink tuple and add
additional fields, the type will be recognized as tuple and the
TupleSerializer will be used to serialize and deserialize the record. Since
the TupleSerializer is not aware of your additional fields it will not
These reason is that the non-rich function interfaces are SAM (single
abstract method) interfaces.
In Java 8, SAM interfaces can be specified as concise lambda functions.
Cheers, Fabian
2015-11-09 10:45 GMT+01:00 Flavio Pompermaier :
> Hi flinkers,
> I have a simple
Converting String ids into Long ids can be quite expensive, so you should
make sure it pays off.
The save way to do it is to get all unique String ids (project, distinct),
do zipWithUniqueId, and join all DataSets that have the String id with the
new long id. So it is a full sort for the unique
You refer to the DataSet (batch) API, right?
In that case you can evaluate your condition in the program and fetch a
DataSet back to the client using List myData = DataSet.collect();
Based on the result of the collect() call you can define and execute a new
program.
Note: collect() will
and execute operations on C dataset.
>
> Some pseudocode from your solution:
> DataSet A = env.readFile(...);
> DataSet C = env.readFile(...);
>
> A.groupBy().reduce().filter(*Check conditions here and in case start
> processing C*);
>
>
> Thanks,
> Giacomo
>
>
>
Thanks for starting this Kostas.
I think the list is quite hidden in the wiki. Should we link from
flink.apache.org to that page?
Cheers, Fabian
2015-10-19 14:50 GMT+02:00 Kostas Tzoumas :
> Hi everyone,
>
> I started a "Powered by Flink" wiki page, listing some of the
>
Sounds good +1
2015-10-19 14:57 GMT+02:00 Márton Balassi <balassi.mar...@gmail.com>:
> Thanks for starting and big +1 for making it more prominent.
>
> On Mon, Oct 19, 2015 at 2:53 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Thanks for starting this Kostas.
&
Hi Philip,
here a few additions to what Max said:
- ORDER BY: As Max said, Flink's sortPartition() does only sort with a
partition and does not produce a total order. You can either set the
parallelism to 1 as Max suggested or use a custom partitioner to range
partition the data.
- SORT BY: From
e combination with [Distribute
> By] + [Sort By]. Therefore, according to your suggestion, should it be
> partitionByHash() + sortGroup() instead of sortPartition() ?
>
> Or probably I did not still get much difference between Partition and
> scope within a reduce.
>
> Regards,
&
s a question difficult to answer to
> interested users.
>
>
> On 19.10.2015 15:08, Suneel Marthi wrote:
>
> +1 to this.
>
> On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Sounds good +1
>>
>> 2015-10-19 14:57 GMT+02:0
I think it's not a nice solution to check for the type of the returned
execution environment to determine whether it is a local or a remote
execution environment.
Wouldn't it be better to add a method isLocal() to ExecutionEnvironment?
Cheers, Fabian
2015-10-14 19:14 GMT+02:00 Flavio
Hi Konstantin,
Flink uses managed memory only for its internal processing (sorting, hash
tables, etc.).
If you allocate too much memory in your user code, it can still fail with
an OOME. This can also happen for large broadcast sets.
Can you check how much much memory the JVM allocated and how
Hi Flavio,
it is not possible to split by line count because that would mean to read
and parse the file just for splitting.
Parallel processing of data sources depends on the input splits created by
the InputFormat. Local files can be split just like files in HDFS. Usually,
each file corresponds
Hi Dongwon Kim,
this blog post describes Flink's memory management, serialization, and sort
algorithm and also includes performance numbers of some microbenchmarks.
--
http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
The difference between Text and OptimizedText, is
Hi Luca,
parsing JSON can be tricky if your schema is nested.
In case of a flat schema (as yours), you can read the JSON records like
this:
ExecutionEnvironment env = ...
DataSetString jsonRaw = env.readFileOfPrimitives(path, },,
String.class); // }, is a sequence that uniquely delimits your
Hi Ron,
Have you checked:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#transformations
?
Fold is like reduce, except that you define a start element (of a different
type than the input type) and the result type is the type of the initial
value. In
In case of rebalance(), all sources start the round-robin partitioning at
index 0. Since each source emits only very few elements, only the first 15
mappers receive any input.
It would be better to let each source start the round-robin partitioning at
a different index, something like startIdx =
wever for shuffle() I would expect that each source task uses a
> different shuffle pattern...
>
> -Matthias
>
> On 09/03/2015 03:28 PM, Fabian Hueske wrote:
> > In case of rebalance(), all sources start the round-robin partitioning at
> > index 0. Since each source e
CoGroup is more generic than Join. You can perform a Join with CoGroup but
not do a CoGroup with a Join.
However, Join can be executed more efficiently than CoGroup.
2015-09-03 22:28 GMT+02:00 hagersaleh :
> what different between join and coGroup in flink
>
>
>
>
> --
>
.it>:
> ok thanks. are there any alternatives to that?may I use accumulators for
> that?
> On 7 Sep 2015 17:47, "Fabian Hueske" <fhue...@gmail.com> wrote:
>
>> If the loop count of 3 is fixed (or not significantly larger), union
>> should be fine.
>>
>&g
Hi Flavio,
your example does not contain a union.
Union itself basically comes for free. However, if you have a lot of small
DataSet that you want to union, the plan can become very complex and might
cause overhead due to scheduling many small tasks. For example, it is
usually better to have one
In that case you should go with union.
2015-09-07 19:06 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
> 3 or 4 usually..
> On 7 Sep 2015 18:39, "Fabian Hueske" <fhue...@gmail.com> wrote:
>
>> And how many unions would your program use if you would
And how many unions would your program use if you would follow the
union-in-loop approach?
2015-09-07 18:31 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
> In the order of 10 GB..
>
> On Mon, Sep 7, 2015 at 6:14 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>
; the LRU in each node)
>
> and if I create the LRU in the fileoutputstream, how many of them will be
> created? one for each ‘degree of parallelism’ right?
>
>
> thanks
> michele
>
>
> Il giorno 08/set/2015, alle ore 16:49, Fabian Hueske <fhue...@gmail.com>
> h
Btw, it is working with a parallelism 1 source, because only a single
source partitions (round-robin or random) the data.
Several sources do not assign work to the same few mappers.
2015-09-03 15:22 GMT+02:00 Matthias J. Sax :
> If it would be only 14 elements, you are
d in this cases?
>
>
>
>
> Il giorno 08/set/2015, alle ore 17:55, Fabian Hueske <fhue...@gmail.com>
> ha scritto:
>
> I think you should not extend the FileOutputFormat but implement a
> completely new OutputFormat. You can of course copy some of the
> FileOutput
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesel
they are grouped, only one stream per slot will be open (I always
> use degree of parallelism at the highest in this step) and it will be
> opened only once (no append)
>
>
> is it right?
>
>
>
> thanks a lot
> michele
>
>
>
> Il giorno 09/set/2015, alle ore 1
t; (the output key) and save the order of key3? if it is possible
>
>
> Il giorno 08/set/2015, alle ore 18:39, Fabian Hueske <fhue...@gmail.com>
> ha scritto:
>
> I did not fully understand you last question, but I'll try to answer.
>
> If you do a
> myD
ing something thus stopping execution or
> is it something different?
>
>
>
>
>
> Il giorno 14/set/2015, alle ore 22:18, Fabian Hueske <fhue...@gmail.com>
> ha scritto:
>
> Hi Michele,
>
> collect on DataSet and collect on a Collector within a Function are t
This sound like a problem with your custom type and its (presumably) custom
serializers and comparators.
I assume it is not an issue of partitioning or sorting because Reduce is
working fine, as you reported.
CoGroup does also partition and sort data, but compares the elements of two
sorted
ice day!
>
> - Pieter
>
>
>
> 2015-09-16 14:27 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Sorry, I was thinking too complicated. Forget about the methods I
>> mentioned.
>>
>> If you are implementing WritableComparable types, you need to ove
Hi Michele,
Flink programs can have multiple sinks.
In your program, the intermediate result a will be streamed to both filters
(b and c) at the same time and both sinks will be written at the same time.
So in this case, there is no need to materialize the intermediate result a.
If you call
t;
> Can you answer my question from above? If the setParallelism-method works
> and selects five nodes for the first flatMap and five _other_ nodes for the
> second flatMap, then that would be fine for me if there is no other easy
> solution.
>
> Thanks for your help!
> Best
> Stefan
>
>
, but I guess there is
> some heuristic inside which decides how to distribute.In the normal setup
> that all 10 nodes are up, connection is good, all nodes have the same
> resources available, input data is evenly distributed in HDFS, then the
> default case should be to distribute to all
Hi Emmanuel,
yes Master HA is currently under development and only available in 0.10
snapshot. AFAIK, it is almost but not completely done yet.
Best, Fabian
On Sep 10, 2015 01:29, "Emmanuel" wrote:
> is this a 0.10 snapshot feature only? I'm using 0.9.1 right now
>
>
>
Hi Stefan,
I agree with Sachin's approach. That should be the easiest solution and
would look like:
env.setParallelism(10); // default is 10
DataSet data = env.read(...) // large data set
DataSet smallData1 = env.read(...) // read first part of small data
DataSet smallData2 = env.read(...) //
Hi Pieter,
cross is indeed too expensive for this task.
If dataset A fits into memory, you can do the following: Use a
RichMapPartitionFunction to process dataset B and add dataset A as a
broadcastSet. In the open method of mapPartition, you can load the
broadcasted set and sort it by
Can you share a link to the comparison?
Thanks, Fabian
2015-09-28 9:32 GMT+02:00 Flavio Pompermaier :
> Hi to all,
>
> I saw this comparison vs Spark, Storm and Apache Apex.
> Since I didn't have time to look at the streaming part of Apache Flink, it
> could be interesting
ady available on the node and just needs to be exposed
> somehow, right?
>
> Cheers
> Stefan
>
>
>
> On 17 September 2015 at 18:39, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Stefan,
>>
>> I think I have a solution for your problem :-)
&g
1 - 100 of 1535 matches
Mail list logo