Hello, everyone!
I've experienced problem, wnen using nested reduceByKeyAndWindow.
My task is to parse json-formatted events from textFileStream, and create aggregations for each field.
E.g. having such input:

{"type":"EventOne", "attr1":10,"attr2":20}

I have projections:

{"type":"EventOne", count:1}

{"type":"EventOne", "attr1":10, count:1}

{"type":"EventOne", "attr1":20, count:1}

{"type":"EventOne", "attr1":10, "attr2":20, count:1}

Each Durations.seconds (150) I pre-aggregate this data (to save and use for larger window). But larger windows receive no data at all - this looks like a bug.

Here is the code:

Duration preAggDuration = Durations.seconds(150);
Duration windowComputationPeriod = Durations.seconds(300);
JavaStreamingContext streamingContext 
=newJavaStreamingContext(sparkConf,Durations.seconds(10));

JavaDStream<String> lines = streamingContext.textFileStream("hdfs://my_dir"); 
// read lines
JavaDStream<MyEvent> eventStream = lines.repartition(350).flatMap(new 
MyEventParser()); // parse lines into models
JavaPairDStream<MyProjectionKey, MyProjection> projectionStream = 
eventStream.flatMapToPair(new MyProjectionFunction()); // each model is splitted into 
projections
JavaPairDStream<MyProjectionKey, MyProjection> preAggStream = 
projectionStream.reduceByKeyAndWindow(new MySumFunction(), preAggDuration, 
preAggDuration); // pre-aggregated data
preAggStream.foreachRDD(SAVE_AS_OBJECT_FILE);



// computations in large windows
for(int windowSize : new int[]{60,1440})
{
        JavaPairDStream<MyProjectionKey, MyProjection> windowStream = 
preAggStream.reduceByKeyAndWindow(new MySumFunction(), Durations.minutes(windowSize), 
windowComputationPeriod);
        windowStream.count().print(); // here I have no data :(
        JavaPairDStream<MyProjectionKey, MyProjection> windowMergedStream = 
windowStream.transformToPair(/* here goes merge of this window with historical data 
*/);
        windowMergedStream.count.print(); // here I have zero :(


}





Reply via email to