Hello, everyone!
I've experienced problem, wnen using nested reduceByKeyAndWindow.
My task is to parse json-formatted events from textFileStream, and
create aggregations for each field.
E.g. having such input:
{"type":"EventOne", "attr1":10,"attr2":20}
I have projections:
{"type":"EventOne", count:1}
{"type":"EventOne", "attr1":10, count:1}
{"type":"EventOne", "attr1":20, count:1}
{"type":"EventOne", "attr1":10, "attr2":20, count:1}
Each Durations.seconds (150) I pre-aggregate this data (to save and use
for larger window). But larger windows receive no data at all - this
looks like a bug.
Here is the code:
Duration preAggDuration = Durations.seconds(150);
Duration windowComputationPeriod = Durations.seconds(300);
JavaStreamingContext streamingContext
=newJavaStreamingContext(sparkConf,Durations.seconds(10));
JavaDStream<String> lines = streamingContext.textFileStream("hdfs://my_dir");
// read lines
JavaDStream<MyEvent> eventStream = lines.repartition(350).flatMap(new
MyEventParser()); // parse lines into models
JavaPairDStream<MyProjectionKey, MyProjection> projectionStream =
eventStream.flatMapToPair(new MyProjectionFunction()); // each model is splitted into
projections
JavaPairDStream<MyProjectionKey, MyProjection> preAggStream =
projectionStream.reduceByKeyAndWindow(new MySumFunction(), preAggDuration,
preAggDuration); // pre-aggregated data
preAggStream.foreachRDD(SAVE_AS_OBJECT_FILE);
// computations in large windows
for(int windowSize : new int[]{60,1440})
{
JavaPairDStream<MyProjectionKey, MyProjection> windowStream =
preAggStream.reduceByKeyAndWindow(new MySumFunction(), Durations.minutes(windowSize),
windowComputationPeriod);
windowStream.count().print(); // here I have no data :(
JavaPairDStream<MyProjectionKey, MyProjection> windowMergedStream =
windowStream.transformToPair(/* here goes merge of this window with historical data
*/);
windowMergedStream.count.print(); // here I have zero :(
}