Re: Serialization problem for Guava's TreeMultimap

2016-09-20 Thread Yukun Guo
Thank you for quickly fixing it!


On 20 September 2016 at 17:17, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Yukun,
>
> I debugged this issue and found that this is a bug in the serialization of
> the StateDescriptor.
> I have created FLINK-4640 [1] to resolve the issue.
>
> Thanks for reporting the issue.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-4640
>
> 2016-09-20 10:35 GMT+02:00 Yukun Guo <gyk@gmail.com>:
>
>> Some detail: if running the FoldFunction on a KeyedStream, everything
>> works fine. So it must relate to the way WindowedStream handles type
>> extraction.
>>
>> In case any Flink experts would like to reproduce it, I have created a
>> repo on Github: github.com/gyk/flink-multimap
>>
>> On 20 September 2016 at 10:33, Yukun Guo <gyk@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The same error occurs after changing the code, unfortunately.
>>>
>>> BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T
>>> serializer` where T extends Serializer & Serializable, so I pass a
>>> custom GenericJavaSerializer, but I guess this doesn't matter much.
>>>
>>>
>>> On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> Can you use "env.getConfig().registerTypeW
>>>> ithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ?
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk@gmail.com> wrote:
>>>>
>>>>> Here is the code snippet:
>>>>>
>>>>> windowedStream.fold(TreeMultimap.<Long, String>create(), new 
>>>>> FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>>>>> @Override
>>>>> public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> 
>>>>> topKSoFar,
>>>>>Tuple2<String, Long> 
>>>>> itemCount) throws Exception {
>>>>> String item = itemCount.f0;
>>>>> Long count = itemCount.f1;
>>>>> topKSoFar.put(count, item);
>>>>> if (topKSoFar.keySet().size() > topK) {
>>>>> topKSoFar.removeAll(topKSoFar.keySet().first());
>>>>> }
>>>>> return topKSoFar;
>>>>> }
>>>>> });
>>>>>
>>>>>
>>>>> The problem is when fold function getting called, the initial value
>>>>> has lost therefore it encounters a NullPointerException. This is due to
>>>>> failed type extraction and serialization, as shown in the log message:
>>>>> "INFO  TypeExtractor:1685 - No fields detected for class
>>>>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
>>>>> Will be handled as GenericType."
>>>>>
>>>>> I have tried the following two ways to fix it but neither of them
>>>>> worked:
>>>>>
>>>>> 1. Writing a class TreeMultimapSerializer which extends Kryo's
>>>>> Serializer, and calling 
>>>>> `env.addDefaultKryoSerializer(TreeMultimap.class,
>>>>> new TreeMultimapSerializer()`. The write/read methods are almost
>>>>> line-by-line translations from TreeMultimap's own implementation.
>>>>>
>>>>> 2. TreeMultimap has implemented Serializable interface so Kryo can
>>>>> fall back to use the standard Java serialization. Since Kryo's
>>>>> JavaSerializer itself is not serializable, I wrote an adapter to make it
>>>>> fit the "addDefaultKryoSerializer" API.
>>>>>
>>>>> Could you please give me some working examples for custom Kryo
>>>>> serialization in Flink?
>>>>>
>>>>>
>>>>> Best regards,
>>>>> Yukun
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Extract type information from SortedMap

2016-07-09 Thread Yukun Guo
Hi Robert,

On 9 July 2016 at 00:25, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Yukun,
>
> can you also post the code how you are invoking the GenericFlatMapper on
> the mailing list?
>

Here is the code defining the topology:

DataStream stream = ...;
stream
.keyBy(new KeySelector<String, Integer>() {
@Override
public Integer getKey(String x) throws Exception {
return x.hashCode() % 10;
}
})
.timeWindow(Time.seconds(10))
.fold(new TreeMap<String, Long>(), new FoldFunction<String,
SortedMap<String, Long>>() {
@Override
public SortedMap<String, Long> fold(SortedMap<String, Long> map,
String x) {
Long current = map.get(x);
Long updated = current != null ? current + 1 : 1;
map.put(x, updated);
return map;
}
})
.flatMap(new GenericFlatMapper())
.returns(new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo())
 // throws InvalidTypesException if you comment out this line
.print();



>
> The Java compiler is usually dropping the generic types during compilation
> ("type erasure"), that's why we can not infer the types.
>
>
The error message implies type extraction should be possible when "all
variables in the return type can be deduced from the input type(s)". This
is true for flatMap(Tuple2<Long, T>, Collector<Tuple2<T, String>>), but if
the signature is changed to void flatMap(SortedMap<T, Long>,
Collector<Tuple2<T, Long>>), type inference fails.


>
> On Fri, Jul 8, 2016 at 12:27 PM, Yukun Guo <gyk@gmail.com> wrote:
>
>> Hi,
>> When I run the code implementing a generic FlatMapFunction, Flink
>> complained about InvalidTypesException:
>>
>> public class GenericFlatMapper implements FlatMapFunction<SortedMap<T, 
>> Long>, Tuple2<T, Long>> {
>> @Override
>> public void flatMap(SortedMap<T, Long> m, Collector<Tuple2<T, Long>> 
>> out) throws Exception {
>> for (Map.Entry<T, Long> entry : m.entrySet()) {
>> out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
>> }
>> }
>> }
>>
>>
>> *Exception in thread "main"
>> org.apache.flink.api.common.functions.InvalidTypesException: The return
>> type of function could not be determined automatically, due to type
>> erasure. You can give type information hints by using the returns(...)
>> method on the result of the transformation call, or by letting your
>> function implement the 'ResultTypeQueryable' interface.*
>>
>> *...*
>> *Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
>> Type of TypeVariable 'T' in 'class GenericFlatMapper' could not be
>> determined. This is most likely a type erasure problem. The type extraction
>> currently supports types with generic variables only in cases where all
>> variables in the return type can be deduced from the input type(s).*
>>
>> This puzzles me as Flink should be able to infer the type from arguments.
>> I know returns(...) or other workarounds to give type hint, but they are
>> kind of verbose. Any suggestions?
>>
>>
>


Extract type information from SortedMap

2016-07-08 Thread Yukun Guo
Hi,
When I run the code implementing a generic FlatMapFunction, Flink
complained about InvalidTypesException:

public class GenericFlatMapper implements
FlatMapFunction, Tuple2> {
@Override
public void flatMap(SortedMap m, Collector> out) throws Exception {
for (Map.Entry entry : m.entrySet()) {
out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
}
}
}


*Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: The return
type of function could not be determined automatically, due to type
erasure. You can give type information hints by using the returns(...)
method on the result of the transformation call, or by letting your
function implement the 'ResultTypeQueryable' interface.*

*...*
*Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'T' in 'class GenericFlatMapper' could not be
determined. This is most likely a type erasure problem. The type extraction
currently supports types with generic variables only in cases where all
variables in the return type can be deduced from the input type(s).*

This puzzles me as Flink should be able to infer the type from arguments. I
know returns(...) or other workarounds to give type hint, but they are kind
of verbose. Any suggestions?


Re: Tumbling time window cannot group events properly

2016-07-06 Thread Yukun Guo
You're right, I forgot to check that the "events in this window" line
actually showed the number of events inside each window was what I
expected, despite being printed a bit out of order. Thank you for the help!

On 5 July 2016 at 17:37, Aljoscha Krettek <aljos...@apache.org> wrote:

> The order in which elements are added to internal buffers and the point in
> time when FoldFunction.fold() is called don't indicate to which window
> elements are added. Flink will internally keep a buffer for each window and
> emit the window once the watermark passes the end of the window. In your
> case, there could be several windows in-flight at one given time. So the
> elements with a timestamp in [19:10:40, 19:10:49] will be added to that
> window and elements with a timestamp in [19:10:50, 19:10:59] will be added
> to this other window.
>
> Looking at your log, the "100 events in this window" message indicates
> that the watermark probably passed the end of the [19:10:40, 19:10:49]
> window and the result for that window was emitted. The elements with
> timestamp 19:10:50 that you see before that in the log are added to the
> buffer for a later window that will be emitted at a future time.
>
> On Tue, 5 Jul 2016 at 04:35 Yukun Guo <gyk@gmail.com> wrote:
>
>> The output is the timestamps of events in string. (For convenience, the
>> payload of each event is exactly the timestamp of it.) As soon as the
>> folding of a time window is finished, the code will print "# events in this
>> window" indicating the end of the window.
>>
>> The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59],
>> ..., but in the example above, the events at 19:10:50, which belong to
>> [19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49]
>> one.
>>
>> On 4 July 2016 at 21:41, Aljoscha Krettek <aljos...@apache.org> wrote:
>>
>>> Could you please elaborate a bit on what exactly the output means and
>>> how you derive that events are leaking into the previous window?
>>>
>>> On Mon, 4 Jul 2016 at 13:20 Yukun Guo <gyk@gmail.com> wrote:
>>>
>>>> Thanks for the information. Strange enough, after I set the time
>>>> characteristic to EventTime, the events are leaking into the previous
>>>> window:
>>>>
>>>> ...
>>>> Mon, 04 Jul 2016 19:10:49 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST # ?
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> 100 events in this window
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:51 CST
>>>> Mon, 04 Jul 2016 19:10:51 CST
>>>>
>>>>
>>>> On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>>
>>>>> Hi,
>>>>> I think it should be as simple as setting event time as the stream
>>>>> time characteristic:
>>>>>
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>
>>>>> The problem is that .timeWindow(Time.seconds(10)) will use processing
>>>>> time if you don't specify a time characteristic. You can enforce using an
>>>>> event-time window using this:
>>>>>
>>>>> stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I wrote a program which constructs a WindowedStream to compute
>>>>>> periodic data statistics every 10 seconds. However, I found that events
>>>>>> have not been strictly grouped into windows of 10s duration, i.e., some
>>>>>> events are leaking into the adjacent window.
>>>>>>
>>>>>> The output is like this:
>>>>>>
>>>>>> Mon, 04 Jul 2016 11:11:50 CST  # 1
>>>>>> Mon, 04 Jul 2016 11:11:50 CST  # 2
>>>>>> # removed for brevity
>>>>>> Mon, 04 Jul 2016 11:11:59 CST  # 99
>>>>>> 99 events in this window
>>>>>&

Re: Tumbling time window cannot group events properly

2016-07-04 Thread Yukun Guo
The output is the timestamps of events in string. (For convenience, the
payload of each event is exactly the timestamp of it.) As soon as the
folding of a time window is finished, the code will print "# events in this
window" indicating the end of the window.

The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], ...,
but in the example above, the events at 19:10:50, which belong to
[19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49]
one.

On 4 July 2016 at 21:41, Aljoscha Krettek <aljos...@apache.org> wrote:

> Could you please elaborate a bit on what exactly the output means and how
> you derive that events are leaking into the previous window?
>
> On Mon, 4 Jul 2016 at 13:20 Yukun Guo <gyk@gmail.com> wrote:
>
>> Thanks for the information. Strange enough, after I set the time
>> characteristic to EventTime, the events are leaking into the previous
>> window:
>>
>> ...
>> Mon, 04 Jul 2016 19:10:49 CST
>> Mon, 04 Jul 2016 19:10:50 CST # ?
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> 100 events in this window
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:51 CST
>> Mon, 04 Jul 2016 19:10:51 CST
>>
>>
>> On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote:
>>
>>> Hi,
>>> I think it should be as simple as setting event time as the stream time
>>> characteristic:
>>>
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>
>>> The problem is that .timeWindow(Time.seconds(10)) will use processing
>>> time if you don't specify a time characteristic. You can enforce using an
>>> event-time window using this:
>>>
>>> stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I wrote a program which constructs a WindowedStream to compute periodic
>>>> data statistics every 10 seconds. However, I found that events have not
>>>> been strictly grouped into windows of 10s duration, i.e., some events are
>>>> leaking into the adjacent window.
>>>>
>>>> The output is like this:
>>>>
>>>> Mon, 04 Jul 2016 11:11:50 CST  # 1
>>>> Mon, 04 Jul 2016 11:11:50 CST  # 2
>>>> # removed for brevity
>>>> Mon, 04 Jul 2016 11:11:59 CST  # 99
>>>> 99 events in this window
>>>> Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong
>>>> window
>>>> Mon, 04 Jul 2016 11:12:00 CST
>>>>
>>>> Here is the code:
>>>>
>>>> import org.apache.commons.lang3.time.FastDateFormat;
>>>> import org.apache.flink.api.common.functions.FoldFunction;
>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>> import org.apache.flink.api.java.functions.KeySelector;
>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>> import 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import 
>>>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>>>> import 
>>>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>
>>>> public class TimeWindow {
>>>>
>>>> private static class TimestampAssigner implements 
>>>> AssignerWithPeriodicWatermarks {
>>>> private final long DELAY = 500;
>>>> private long currentWatermark;
>>>>
>>>> @Override
>>>> public Watermark getCurrentWatermark() {
>>>> return new Watermark(currentWatermark);
>>>> }
>>>>
>>>> @Override
>>>> public long extractTimestamp(Long event, long l) {
>>>> currentWatermark = Math.max(currentWatermark, event - DELAY);
>>>> return event;
>>>> }
>>>> }
>>>

Re: Tumbling time window cannot group events properly

2016-07-04 Thread Yukun Guo
Thanks for the information. Strange enough, after I set the time
characteristic to EventTime, the events are leaking into the previous
window:

...
Mon, 04 Jul 2016 19:10:49 CST
Mon, 04 Jul 2016 19:10:50 CST # ?
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
100 events in this window
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:51 CST
Mon, 04 Jul 2016 19:10:51 CST


On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> I think it should be as simple as setting event time as the stream time
> characteristic:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> The problem is that .timeWindow(Time.seconds(10)) will use processing time
> if you don't specify a time characteristic. You can enforce using an
> event-time window using this:
>
> stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))
>
> Cheers,
> Aljoscha
>
>
> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk@gmail.com> wrote:
>
>> Hi,
>>
>> I wrote a program which constructs a WindowedStream to compute periodic
>> data statistics every 10 seconds. However, I found that events have not
>> been strictly grouped into windows of 10s duration, i.e., some events are
>> leaking into the adjacent window.
>>
>> The output is like this:
>>
>> Mon, 04 Jul 2016 11:11:50 CST  # 1
>> Mon, 04 Jul 2016 11:11:50 CST  # 2
>> # removed for brevity
>> Mon, 04 Jul 2016 11:11:59 CST  # 99
>> 99 events in this window
>> Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong
>> window
>> Mon, 04 Jul 2016 11:12:00 CST
>>
>> Here is the code:
>>
>> import org.apache.commons.lang3.time.FastDateFormat;
>> import org.apache.flink.api.common.functions.FoldFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.java.functions.KeySelector;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import 
>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>> import 
>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>>
>> public class TimeWindow {
>>
>> private static class TimestampAssigner implements 
>> AssignerWithPeriodicWatermarks {
>> private final long DELAY = 500;
>> private long currentWatermark;
>>
>> @Override
>> public Watermark getCurrentWatermark() {
>> return new Watermark(currentWatermark);
>> }
>>
>> @Override
>> public long extractTimestamp(Long event, long l) {
>> currentWatermark = Math.max(currentWatermark, event - DELAY);
>> return event;
>> }
>> }
>>
>> public static void main(String[] args) throws Exception {
>> final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd 
>> MMM  HH:mm:ss z");
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.createLocalEnvironment();
>>
>> DataStream stream = env.addSource(new 
>> RichParallelSourceFunction() {
>> private volatile boolean isRunning = true;
>>
>> @Override
>> public void run(SourceContext sourceContext) throws 
>> Exception {
>> while (isRunning) {
>> sourceContext.collect(System.currentTimeMillis());
>> Thread.sleep(200);
>> }
>>
>> sourceContext.close();
>> }
>>
>> @Override
>> public void cancel() {
>> isRunning = false;
>> }
>> });
>>
>> stream
>> .assignTimestampsAndWatermarks(new TimestampAssigner())
>> .keyBy(new KeySelector<Long, Integer>() {
>> @Override
>> public Integer getKey(Long x) throws Exception {
>> return 0;
>> }
>> })
>> .timeWindow(Time.seconds(10))
>>

Re: Strange behavior of DataStream.countWindow

2016-06-11 Thread Yukun Guo
Thx, now I use element.hashCode() % nPartitions and it works as expected.

But I'm afraid it's not a best practice for just turning a plain (already
paralellized) DataStream into a KeyedStream? Because it introduces some
overhead due to physical repartitioning by key, which is unnecessary since
I don't really care about keys.

On 9 June 2016 at 22:00, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Yukun,
>
> the problem is that the KeySelector is internally invoked multiple times.
> Hence it must be deterministic, i.e., it must extract the same key for the
> same object if invoked multiple times.
> The documentation is not discussing this aspect and should be extended.
>
> Thanks for pointing out this issue.
>
> Cheers,
> Fabian
>
>
> 2016-06-09 13:19 GMT+02:00 Yukun Guo <gyk@gmail.com>:
>
>> I’m playing with the (Window)WordCount example from Flink QuickStart. I
>> generate a DataStream consisting of 1000 Strings of random digits, which
>> is windowed with a tumbling count window of 50 elements:
>>
>> import org.apache.flink.api.common.functions.FlatMapFunction;import 
>> org.apache.flink.api.java.functions.KeySelector;import 
>> org.apache.flink.api.java.tuple.Tuple2;import 
>> org.apache.flink.streaming.api.datastream.DataStream;import 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
>> org.apache.flink.util.Collector;
>> import java.util.Random;
>> public class DigitCount {
>>
>>
>> public static void main(String[] args) throws Exception {
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> DataStream text = env.fromElements(
>> "14159265358979323846264338327950288419716939937510",
>> "58209749445923078164062862089986280348253421170679",
>> "82148086513282306647093844609550582231725359408128",
>> "48111745028410270193852110555964462294895493038196",
>> "44288109756659334461284756482337867831652712019091",
>> "45648566923460348610454326648213393607260249141273",
>> "72458700660631558817488152092096282925409171536436",
>> "78925903600113305305488204665213841469519415116094",
>> "33057270365759591953092186117381932611793105118548",
>> "07446237996274956735188575272489122793818301194912",
>> "98336733624406566430860213949463952247371907021798",
>> "60943702770539217176293176752384674818467669405132",
>> "00056812714526356082778577134275778960917363717872",
>> "14684409012249534301465495853710507922796892589235",
>> "42019956112129021960864034418159813629774771309960",
>> "5187072113499837297804995105973173281609631859",
>> "50244594553469083026425223082533446850352619311881",
>> "71010003137838752886587533208381420617177669147303",
>> "59825349042875546873115956286388235378759375195778",
>> "18577805321712268066130019278766111959092164201989"
>> );
>>
>> DataStream<Tuple2<Integer, Integer>> digitCount = text
>> .flatMap(new Splitter())
>> .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
>> @Override
>> public Integer getKey(Tuple2<Integer, Integer> x) throws 
>> Exception {
>> return x.f0 % 2;
>> }
>> })
>> .countWindow(50)
>> .sum(1);
>>
>> digitCount.print();
>> env.execute();
>>
>> }
>>
>> public static final class Splitter implements FlatMapFunction<String, 
>> Tuple2<Integer, Integer>> {
>> @Override
>> public void flatMap(String value, Collector<Tuple2<Integer, 
>> Integer>> out) {
>> for (String token : value.split("")) {
>> if (token.length() == 0) {
>> continue;
>> }
>> out.collect(Tuple2.of(Integer.parseInt(token), 1));
>> }
>> }
>> }
>> }
>>
>> T

Strange behavior of DataStream.countWindow

2016-06-09 Thread Yukun Guo
I’m playing with the (Window)WordCount example from Flink QuickStart. I
generate a DataStream consisting of 1000 Strings of random digits, which is
windowed with a tumbling count window of 50 elements:

import org.apache.flink.api.common.functions.FlatMapFunction;import
org.apache.flink.api.java.functions.KeySelector;import
org.apache.flink.api.java.tuple.Tuple2;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.util.Collector;
import java.util.Random;
public class DigitCount {


public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream text = env.fromElements(
"14159265358979323846264338327950288419716939937510",
"58209749445923078164062862089986280348253421170679",
"82148086513282306647093844609550582231725359408128",
"48111745028410270193852110555964462294895493038196",
"44288109756659334461284756482337867831652712019091",
"45648566923460348610454326648213393607260249141273",
"72458700660631558817488152092096282925409171536436",
"78925903600113305305488204665213841469519415116094",
"33057270365759591953092186117381932611793105118548",
"07446237996274956735188575272489122793818301194912",
"98336733624406566430860213949463952247371907021798",
"60943702770539217176293176752384674818467669405132",
"00056812714526356082778577134275778960917363717872",
"14684409012249534301465495853710507922796892589235",
"42019956112129021960864034418159813629774771309960",
"5187072113499837297804995105973173281609631859",
"50244594553469083026425223082533446850352619311881",
"71010003137838752886587533208381420617177669147303",
"59825349042875546873115956286388235378759375195778",
"18577805321712268066130019278766111959092164201989"
);

DataStream> digitCount = text
.flatMap(new Splitter())
.keyBy(new KeySelector, Integer>() {
@Override
public Integer getKey(Tuple2 x)
throws Exception {
return x.f0 % 2;
}
})
.countWindow(50)
.sum(1);

digitCount.print();
env.execute();

}

public static final class Splitter implements
FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) {
for (String token : value.split("")) {
if (token.length() == 0) {
continue;
}
out.collect(Tuple2.of(Integer.parseInt(token), 1));
}
}
}
}

The code above will produce 19 lines of output which is reasonable as the
1000 digits will be keyed into 2 partitions where one partition contains
500+ elements and the other contains slightly fewer than 500 elements,
therefore as a result one 50-digit window is ignored.

So far so good, but if I replace the mod KeySelector with a random one:

private static class RandomKeySelector implements KeySelector {
private int nPartitions;
private Random random;

RandomKeySelector(int nPartitions) {
this.nPartitions = nPartitions;
random = new Random();
}

@Override
public Integer getKey(T dummy) throws Exception {
return random.nextInt(this.nPartitions);
}
}

and then

.keyBy(new RandomKeySelector>(2))

it may generate 17 or 18 lines of output. How could that happen? Moreover,
if I set the number of partitions to 10, in theory the lines of output
should be no fewer than 11, but actually it can be only 9.

Please help me understand why countWindow behaves like this.


Re: Hourly top-k statistics of DataStream

2016-06-06 Thread Yukun Guo
My algorithm is roughly like this taking top-K words problem as an example
(the purpose of computing local “word count” is to deal with data
imbalance):

DataStream of words ->
timeWindow of 1h ->
converted to DataSet of words ->
random partitioning by rebalance ->
local “word count” using mapPartition ->
global “word count” using reduceGroup ->
rebalance ->
local top-K using mapPartition ->
global top-K using reduceGroup

Here is some (probably buggy) code to demonstrate the basic idea on DataSet:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

public class WordCount {

  public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

// get input data
DataSet text = env.fromElements(
"14159265358979323846264338327950288419716939937510",
"58209749445923078164062862089986280348253421170679",
"82148086513282306647093844609550582231725359408128",
"48111745028410270193852110555964462294895493038196",
"44288109756659334461284756482337867831652712019091",
"45648566923460348610454326648213393607260249141273",
"72458700660631558817488152092096282925409171536436",
"78925903600113305305488204665213841469519415116094",
"33057270365759591953092186117381932611793105118548",
"07446237996274956735188575272489122793818301194912",
"98336733624406566430860213949463952247371907021798",
"60943702770539217176293176752384674818467669405132",
"00056812714526356082778577134275778960917363717872",
"14684409012249534301465495853710507922796892589235",
"42019956112129021960864034418159813629774771309960",
"5187072113499837297804995105973173281609631859",
"50244594553469083026425223082533446850352619311881",
"71010003137838752886587533208381420617177669147303",
"59825349042875546873115956286388235378759375195778",
"18577805321712268066130019278766111959092164201989"
);

DataSet> counts = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(new LineSplitter())
.rebalance()
// local word count
.mapPartition(new MapPartitionFunction, Tuple2>() {
  @Override
  public void mapPartition(Iterable> words,
   Collector> out) throws
Exception {
SortedMap m = new TreeMap();
for (Tuple2 w : words) {
  Integer current = m.get(w.f0);
  Integer updated = current == null ? w.f1 : current + w.f1;
  m.put(w.f0, updated);
}

for (Map.Entry e : m.entrySet()) {
  out.collect(Tuple2.of(e.getKey(), e.getValue()));
}
  }
})
// global word count
.reduceGroup(new GroupReduceFunction,
Tuple2>() {
  @Override
  public void reduce(Iterable> wordcounts,
Collector> out) throws Exception {
SortedMap m = new TreeMap();
for (Tuple2 wc : wordcounts) {
  Integer current = m.get(wc.f0);
  Integer updated = current == null ? wc.f1 : current + wc.f1;
  m.put(wc.f0, updated);
}

for (Map.Entry e : m.entrySet()) {
  out.collect(Tuple2.of(e.getKey(), e.getValue()));
}
  }
});

DataSet> topK = counts
.rebalance()
// local top-K
.mapPartition(new MapPartitionFunction, Tuple2>() {
  @Override
  public void mapPartition(Iterable> wordcounts,
   Collector> out) throws
Exception {
SortedMap topKSoFar = new
TreeMap();
for (Tuple2 wc : wordcounts) {
  String w = wc.f0;
  Integer c = wc.f1;
  topKSoFar.put(c, w);
  if (topKSoFar.size() > 3) {
topKSoFar.remove(topKSoFar.firstKey());
  }
}

for (Map.Entry

Hourly top-k statistics of DataStream

2016-06-06 Thread Yukun Guo
Hi,

I'm working on a project which uses Flink to compute hourly log statistics
like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and
packed
into a DataStream.

The problem is, I find the computation quite challenging to express with
Flink's DataStream API:

1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that
the
data volume is really high, e.g., billions of logs might be generated in one
hour, will the window grow too large and can't be handled efficiently?

2. We have to create a `KeyedStream` before applying `timeWindow`. However,
the distribution of some keys are skewed hence using them may compromise
the performance due to unbalanced partition loads. (What I want is just
rebalance the stream across all partitions.)

3. The top-K algorithm can be straightforwardly implemented with `DataSet`'s
`mapPartition` and `reduceGroup` API as in
[FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so easy if
taking the DataStream approach, even with the stateful operators. I still
cannot figure out how to reunion streams once they are partitioned.

4. Is it possible to convert a DataStream into a DataSet? If yes, how can I
make Flink analyze the data incrementally rather than aggregating the logs
for
one hour before starting to process?