Re: Window apply problem
Thanks Jun, Very useful, I was confusing the parameters because my input is tuples, which I might not need in the end... I have now what I wanted (non-parallel and not so efficient I guess, any suggestion to improve is welcome) and I have to modify the trigger so to FIRE_AND_PURGE when it reaches N, the max number of items per window, otherwise it will count the whole data every time... So my example looks like this now: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream transactions = env.fromElements( "1 2 4 3 4", "3 4 5 4 6", "7 3 3 6 1", "1 3 2 4 6" ); DataStream<Hashtable<String, Integer>> counts = transactions .flatMap(new LineSplitter()) // because I am expecting one transaction per line .windowAll(GlobalWindows.create()) .trigger(MyCountTrigger.of(5)) .apply(new MyWindowFunction()); counts.print(); env.execute("ItemsCount"); public static class MyWindowFunction implements AllWindowFunction<Tuple2<String,Integer>, Hashtable<String, Integer>, GlobalWindow> { public Hashtable<String, Integer> itemsMap = new Hashtable<String, Integer>(); @Override public void apply (GlobalWindow window, Iterable<Tuple2<String,Integer>> tuples, Collector<Hashtable<String, Integer>> out) throws Exception { for(Tuple2<String,Integer> tuple : tuples){ if(itemsMap.containsKey(tuple.f0)){ itemsMap.put(tuple.f0, itemsMap.get(tuple.f0)+1); } else { itemsMap.put(tuple.f0,1); } } out.collect(itemsMap); } } Regards, Marcela. On 08.03.2016 09:34, Wang Yangjun wrote: Hello Marcela, I am not sure what is the “parameters mismatch” here. From the example you shown, it seems that you just want do a window word count. Right? Could you try this code and is it want you want? Best, Jun - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 3, 2, 4, 6}; List list = Arrays.asList(array); DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list) .windowAll(GlobalWindows.create()) .trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, Tuple2<Integer, Integer>, GlobalWindow>() { @Override public void apply(GlobalWindow window, Iterable tuples, Collector<Tuple2<Integer, Integer>> out) throws Exception { HashMap<Integer, Integer> map = new HashMap<>(); for(Integer tuple : tuples){ Integer value = 0; if(map.containsKey(tuple)){ value = map.get(tuple); } map.put(tuple, value+1); } for(Map.Entry<Integer, Integer> entry : map.entrySet()) { out.collect(new Tuple2<>(entry.getKey(), entry.getValue())); } } }); counts.print(); env.execute("Stream WordCount"); On 08/03/16 02:57, "Marcela Charfuelan" <charfuelanol...@tu-berlin.de> wrote: hello, I want to make a function for counting items (per type) in windows of size N; For example for N=5 and the stream: 1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6 I would like to generate the tuples: w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1) w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1) w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1) w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1) I am trying to apply my own function with "Window apply", something like: items .windowAll(GlobalWindows.create()) .trigger(CountTrigger.of(5)) .apply(new MyWindowfunction()) but in this case there is a parameters mismatch with apply and WindowFunction, so I am not sure if it is not possible here. any suggestion? Looking at the streaming java examples, the (commented) apply example shown in GroupedProcessingTimeWindowExample() which is applied to a timeWindow, does not work either: .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) .apply(new SummingWindowFunction()) So what I am missing here? any help is appreciated. Regards, Marcela. -- Dr. Marcela Charfuelan, S
Window apply problem
hello, I want to make a function for counting items (per type) in windows of size N; For example for N=5 and the stream: 1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6 I would like to generate the tuples: w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1) w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1) w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1) w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1) I am trying to apply my own function with "Window apply", something like: items .windowAll(GlobalWindows.create()) .trigger(CountTrigger.of(5)) .apply(new MyWindowfunction()) but in this case there is a parameters mismatch with apply and WindowFunction, so I am not sure if it is not possible here. any suggestion? Looking at the streaming java examples, the (commented) apply example shown in GroupedProcessingTimeWindowExample() which is applied to a timeWindow, does not work either: .keyBy(new FirstFieldKeyExtractor, Long>()) .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) .apply(new SummingWindowFunction()) So what I am missing here? any help is appreciated. Regards, Marcela.
Re: Iterations problem in command line
Hi, the iteration looks like: DataSet gmms = getInitialGMMDataSet(env); IterativeDataSet loop = gmms.iterate(50); DataSet newGMMs = features.map(new Estep_ExpectationMaximisation()).withBroadcastSet(loop, "gmms") .reduceGroup(new Mstep_ExpectationMaximisation()).withBroadcastSet(loop, "gmms"); DataSet finalGMMs = loop.closeWith(newGMMs) in every iteration the gmms parameters should be updated, I have noticed that the first iteration is ok, but afterwards start to get wrong... at least in the command line (for example gmms.coeff for the three gmms here should not sum up more that 1) I have put the code here in case it helps: https://github.com/marcelach1/EmExercise Regards, Marcela. On 01.03.2016 10:58, Fabian Hueske wrote: Yes, env.setParallelism(1) fixes the parallelism of all operators to 1 (unless an operator overrides this setting). Can you identify at which position in the data flow the results start to diverge? Best, Fabian 2016-02-29 17:57 GMT+01:00 Marcela Charfuelan <charfuelanol...@tu-berlin.de <mailto:charfuelanol...@tu-berlin.de>>: Thanks Fabian, I am using in both default options, since I am not testing in a cluster yet, just local in ubuntu, I am not specifying any parallelism. just to test I set in the program env.setParallelism(1) and running with -p 1 (which I guess I would not need) but I am still getting the same issue. Regards, MArcela. On 29.02.2016 16:44, Fabian Hueske wrote: Hi Marcela, do you run the algorithm in both setups with the same parallelism? Best, Fabian 2016-02-26 16:52 GMT+01:00 Marcela Charfuelan <charfuelanol...@tu-berlin.de <mailto:charfuelanol...@tu-berlin.de> <mailto:charfuelanol...@tu-berlin.de <mailto:charfuelanol...@tu-berlin.de>>>: Hello, I implemented an algorithm that includes iterations (EM algorithm) and I am getting different results when running in eclipse (Luna Release (4.4.0)) and when running in the command line using Flink run; the program does not crash is just that after the first iteration the results are different (wrong in the command line). The solution I am getting in eclipse, for each iteration, is the same that I would get if running the algorithm in octave for example, so I am sure the solution is correct. I have tried using java plus the command-line arguments of eclipse on the command line and that also works ok (local in ubuntu). Has anybody experienced something similar? Any idea why this could happen? how can I fix this? Regards, Marcela. -- Dr. Marcela Charfuelan, Senior Researcher TU Berlin, School of Electrical Engineering and Computer Sciences Database Systems and Information Management (DIMA) EN7, Einsteinufer 17, D-10587 Berlin Room: EN 725 Phone: +49 30-314-23556 URL: http://www.user.tu-berlin.de/charfuelan
Iterations problem in command line
Hello, I implemented an algorithm that includes iterations (EM algorithm) and I am getting different results when running in eclipse (Luna Release (4.4.0)) and when running in the command line using Flink run; the program does not crash is just that after the first iteration the results are different (wrong in the command line). The solution I am getting in eclipse, for each iteration, is the same that I would get if running the algorithm in octave for example, so I am sure the solution is correct. I have tried using java plus the command-line arguments of eclipse on the command line and that also works ok (local in ubuntu). Has anybody experienced something similar? Any idea why this could happen? how can I fix this? Regards, Marcela.