Re: Deep Copy in FLINK, Kryo Copy is used in the different operator

2018-02-14 Thread Gábor Gévay
Hello, You might also be able to make Flink use a better serializer than Kryo. Flink falls back to Kryo when it can't use its own serializers, see here: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/types_serialization.html For example, it might help to make your type a POJO.

Re: At end of complex parallel flow, how to force end step with parallel=1?

2017-10-03 Thread Gábor Gévay
Hi Garrett, You can call .setParallelism(1) on just this operator: ds.reduceGroup(new GroupReduceFunction...).setParallelism(1) Best, Gabor On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton wrote: > I have a complex alg implemented using the DataSet api and by default

Re: Rule expression for CEP library

2017-09-25 Thread Gábor Gévay
Hello Shailesh, There is a Flink Improvement Proposal for Integration of SQL and CEP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-20:+Integration+of+SQL+and+CEP Best, Gábor On Mon, Sep 25, 2017 at 3:21 PM, Shailesh Jain wrote: > Hi, > > Apart from the

Re: Dot notation not working for accessing case classes nested fields

2017-09-15 Thread Gábor Gévay
Hi Federico, Sorry, nested field expressions are not supported in these methods at the moment. I have created a JIRA issue for this: https://issues.apache.org/jira/browse/FLINK-7629 I think this should be easy to fix, as all the infrastructure for supporting this is already in place. I'll try to

Re: Bulk Iteration

2017-09-14 Thread Gábor Gévay
Hello Alieh, If you set the logging to a more verbose level, then Flink prints a log msg at every iteration. If you need the current iteration number inside your code, then you should create your UDF as an AbstractRichFunction, where you can call getIterationRuntimeContext(), which has

Re: DataSet: CombineHint heuristics

2017-09-05 Thread Gábor Gévay
Hi Urs, Yes, the 1/10th ratio is just a very loose rule of thumb. I would suggest to try both the SORT and HASH strategies with a workload that is as similar as possible to your production workload (similar data, similar parallelism, etc.), and see which one is faster for your specific use case.

Re: termination of stream#iterate on finite streams

2017-09-05 Thread Gábor Gévay
Hello, There is a Flink Improvement Proposal to redesign the iterations: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132 This will address the termination issue. Best, Gábor On Mon, Sep 4, 2017 at 11:00 AM, Xingcan Cui wrote: > Hi Peter, > >

Re: Flink Vs Google Cloud Dataflow?

2017-08-04 Thread Gábor Gévay
Hello, Have you seen these two blog posts? They explain the relationship between Apache Flink, Apache Beam, and Google Cloud Dataflow. https://data-artisans.com/blog/why-apache-beam https://cloud.google.com/blog/big-data/2016/05/why-apache-beam-a-google-perspective Best, Gábor On Mon, Jul

Re: Beginner question - sum multiple edges

2017-04-23 Thread Gábor Gévay
> Marc > > > Am 17.04.2017 um 21:47 schrieb Kaepke, Marc <marc.kae...@haw-hamburg.de>: > > Hi Gábor, > > thanks a lot > > Best, > Marc > > Am 17.04.2017 um 20:32 schrieb Gábor Gévay <gga...@gmail.com>: > > Hello Marc, > > You can gr

Re: Join with Default-Value

2017-02-10 Thread Gábor Gévay
I'm not sure what exactly is the problem, but could you check this FAQ item? http://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception- Best, Gábor 2017-02-10 14:16 GMT+01:00 Sebastian Neef : > Hi, > > thanks! That's exactly what I needed. > >

Re: Join with Default-Value

2017-02-10 Thread Gábor Gévay
Hello Sebastian, You can use DataSet.leftOuterJoin for this. Best, Gábor 2017-02-10 12:58 GMT+01:00 Sebastian Neef : > Hi, > > is it possible to assign a "default" value to elements that didn't match? > > For example I have the following two datasets: > >

Re: Cyclic ConnectedStream

2017-01-31 Thread Gábor Gévay
the stream >> (predictionStream depends on statsStream, but it depends on >> predictionStream in the first place). >> >> I hope it is clear now. >> >> Matt >> >> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote: >>

Re: How to get top N elements in a DataSet?

2017-01-24 Thread Gábor Gévay
Hello, Btw. there is a Jira about this: https://issues.apache.org/jira/browse/FLINK-2549 Note that the discussion there suggests a more efficient approach, which doesn't involve sorting the entire partitions. And if I remember correctly, this question comes up from time to time on the mailing

Re: Equivalent of Rx combineLatest() on a join?

2016-12-13 Thread Gábor Gévay
Dear Denis, I think you can do it with a simple CoFlatMapFunction (without windows): To use a CoFlatMapFunction, you need to first connect [1] your streams, which results in a ConnectedStreams. Then you can call flatMap on this, and give a CoFlatMapFunction to it (where two different callbacks

Re: spark vs flink batch performance

2016-11-18 Thread Gábor Gévay
ries has some performance improvements because in those kind of >> tests, spark and flink was either on par or flink 10-15% faster than spark >> in the past. Aside from that are any configuration parameters you may >> propose to fine tune flink? >> >> Best, >

Re: spark vs flink batch performance

2016-11-18 Thread Gábor Gévay
Hello, Your program looks mostly fine, but there are a few minor things that might help a bit: Parallelism: In your attached flink-conf.yaml, you have 2 task slots per task manager, and if you have 1 task manager, then your total number of task slots is also 2. However, your default parallelism

Re: Retrieving values from a dataset of datasets

2016-11-16 Thread Gábor Gévay
The short answer is that because DataSet is not serializable. I think the main underlying problem is that Flink needs to see all DataSet operations before launching the job. However, if you have a DataSet, then operations on the inner DataSets will end up being specified inside the UDFs

Re: Retrieving values from a dataset of datasets

2016-11-15 Thread Gábor Gévay
Hello, How exactly do you represent the DataSet of DataSets? I'm asking because if you have something like a DataSet that unfortunately doesn't work in Flink. Best, Gábor 2016-11-14 20:44 GMT+01:00 otherwise777 : > Hey There, > > I'm trying to calculate the

Re: Looping over a DataSet and accesing another DataSet

2016-10-30 Thread Gábor Gévay
Hello, In Flink, one often used way to access data from multiple DataSets at the same time is to perform a join (Flink actually calls equi-joins [1] just "join"), just as in the database world. For example, in the algorithm that you linked, you access A[u] for every edge (u,v). I assume that you

Re: Flink strange stream join behavior

2016-10-16 Thread Gábor Gévay
Hello, For your first question: > the number of tuples are same in both cases I guess you mean the total number of tuples here, right? So this means that you have fewer, but larger windows. Suppose that you have W windows, each with S tuples. Then your total input has W * S tuples, and your

Re: Nested iterations

2016-09-01 Thread Gábor Gévay
rate jobs. > > Is there any plans to support nested loops in the future? > > Thanks, > Supun.. > > On Thu, Sep 1, 2016 at 12:28 PM, Gábor Gévay <gga...@gmail.com> wrote: >> >> Hello Supun, >> >> Unfortunately, nesting of Flink's iteration constructs are not

Re: Nested iterations

2016-09-01 Thread Gábor Gévay
Hello Supun, Unfortunately, nesting of Flink's iteration constructs are not supported at the moment. There are some workarounds though: 1. You can start a Flink job for each step of the iteration. Starting a Flink job has some overhead, so this only works if there is a sufficient amount of work

Re: Performance issues with GroupBy?

2016-07-26 Thread Gábor Gévay
Hello Robert, > Is there something I might could do to optimize the grouping? You can try to make your `RichGroupReduceFunction` implement the `GroupCombineFunction` interface, so that Flink can do combining before the shuffle, which might significantly reduce the network load. (How much the

Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Gábor Gévay
mOperator > > I could not do another sum call on that. Would tell me how did you manage to > do > > stream.sum().sum() > > Regards, > -Rami > > On 7 Jun 2016, at 16:13, Gábor Gévay <gga...@gmail.com> wrote: > > Hello, > > In the case of "sum&quo

Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Gábor Gévay
Hello, In the case of "sum", you can just specify them one after the other, like: stream.sum(1).sum(2) This works, because summing the two fields are independent. However, in the case of "keyBy", the information is needed from both fields at the same time to produce the key. Best, Gábor

Re: time spent for iteration

2016-03-09 Thread Gábor Gévay
to allow for easier retrieval of this > information. > Wouldn't it make sense to expose this to the web UI for example? > We actually had a discussion about this some time ago [1]. > > -Vasia. > > [1]: https://issues.apache.org/jira/browse/FLINK-1759 > > On 9 March 2016 at

Re: time spent for iteration

2016-03-09 Thread Gábor Gévay
Hello, If you increase the log level, you can see each step of the iteration separately in the log, with timestamps. Best, Gábor 2016-03-09 14:04 GMT+01:00 Riccardo Diomedi : > Is it possible to add timer for the time spent for iteration when iterate >

Re: Best way to process data in many files? (FLINK-BATCH)

2016-02-24 Thread Gábor Gévay
Hello, > // For each "filename" in list do... > DataSet featureList = fileList > .flatMap(new ReadDataSetFromFile()) // flatMap because there > might multiple DataSets in a file What happens if you just insert .rebalance() before the flatMap? > This kind of DataSource will only

Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Gábor Gévay
Hello, > I think that there is actually a fundamental latency issue with > "exactly once sinks", no matter how you implement them in any systems: > You can only commit once you are sure that everything went well, > to a specific point where you are sure no replay will ever be needed. What if the

Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Gábor Gévay
The way I imagine this is that the sink would have its "own checkpoints" separately from the rest of the system, and with much smaller interval, and writes to Kafka (with "transactional cooperation", as Stephan mentioned) during making these checkpoints. And then when a replay happens from a

Re: Left join with unbalanced dataset

2016-02-02 Thread Gábor Gévay
Hello Arnaud, > Flink does not start the reduce operation until all lines have > been created (memory bottleneck is during the collection > of all lines) ; but theorically it is possible. The problem that `S.groupBy(...).reduce(...)` needs to fully materialize S comes from the fact that the

Re: Local collection data sink for the streaming API

2016-01-05 Thread Gábor Gévay
tream, rather than > org.apache.flink.streaming.api.scala.DataStream. Any suggestion on how > to handle this, other than creating my own scala implementation of > DataStreamUtils.collect()? > > Thanks, > > Filipe > > On Tue, Jan 5, 2016 at 3:33 PM, Gábor Gévay <gga...@gm

Re: Reading multiple datasets with one read operation

2015-10-22 Thread Gábor Gévay
Hello! > I have thought about a workaround where the InputFormat would return > Tuple2s and the first field is the name of the dataset to which a record > belongs. This would however require me to filter the read data once for > each dataset or to do a groupReduce which is some overhead i'm >

Re: For each element in a dataset, do something with another dataset

2015-09-30 Thread Gábor Gévay
Hello, Alternatively, if dataset B fits in memory, but dataset A doesn't, then you can do it with broadcasting B to a RichMapPartitionFunction on A: In the open method of mapPartition, you sort B. Then, for each element of A, you do a binary search in B, and look at the index found by the binary

Re: Gelly vertex ID type requirements?

2015-07-31 Thread Gábor Gévay
2015-07-31 0:29 GMT+02:00 Fabian Hueske fhue...@gmail.com: Hi, I opened a JIRA (FLINK-2442) and submitted a PR (#963) for the Wrong field type problem. Is the other problem is addressed in FLINK-2437? Cheers, Fabian 2015-07-30 16:29 GMT+02:00 Gábor Gévay gga...@gmail.com: Thanks

Gelly vertex ID type requirements?

2015-07-30 Thread Gábor Gévay
Hello, I am having some trouble building a graph where the vertex ID type is a POJO. Specifically, it would be a class that has two fields: a long field, and a field which is of another class that has four byte fields. (Both classes implement the Comparable interface, as the Gelly guide

Re: Gelly vertex ID type requirements?

2015-07-30 Thread Gábor Gévay
. This happens, because PojoType is neither a TupleType nor an AtomicType. To me it looks like the TupleTypeInfoBase condition should be generalized to CompositeType. I will look into this. Cheers, Fabian 2015-07-30 14:18 GMT+02:00 Gábor Gévay gga...@gmail.com: Hello, I am having some trouble