Hi community,

I am trying to use Storm with Trident API. My use case is, partitioning
stream and making aggregations on partitioned sliding windows.
However, when I debug the outputs, I see that the state of windows in all
partitions are same. So, I would expect, if the tuples' keys are different
then they go to different partitions and are processed on different
windows. Therefore, the state in partitioned windows should not be same. I
am running application on local machine. Am I doing something wrong? or are
partitioned windows not supported in Trident API?
Here is my code:

......
.......
        topology
                .newStream("aggregation", spout)
                .each(new Fields("json"), new SelectFields(), new
Fields("geo","val","max_price","min_price")).parallelismHint(parallelism)
                .partitionBy(new Fields("geo")).parallelismHint(parallelism)
                .slidingWindow(new
BaseWindowedBolt.Duration(slideWindowLength, TimeUnit.MILLISECONDS),
                        new BaseWindowedBolt.Duration(slideWindowSlide,
TimeUnit.MILLISECONDS),
                        new InMemoryWindowsStoreFactory(),
                        new Fields("geo","val","max_price","min_price")  ,
                        new MinMaxAggregator(),
                        new
Fields("geo","val","max_price","min_price")).parallelismHint(parallelism).
                peek(new Consumer() {
                    @Override
                    public void accept(TridentTuple input) {
                        System.out.println( input);
                    }
                });

........
.........

@SuppressWarnings("serial")
class SelectFields extends BaseFunction {

    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        JSONObject obj = new JSONObject(tuple.getString(0));
        String geo = obj.getJSONObject("t").getString("geo");
        Double price = obj.getJSONObject("m").getDouble("price");
        collector.emit( new Values(
                geo,
                System.nanoTime(),
                price,
                price
        ));
    }
}

class MinMaxAggregator extends BaseAggregator<MinMaxAggregator.State> {
     class State {
        double max = 0.0;
        double min = 0.0;
        long val = 0;
        String id = "";
    }

    @Override
    public State init(Object batchId, TridentCollector collector) {
        return new State();
    }

    @Override
    public void aggregate(State state, TridentTuple tuple, TridentCollector
collector) {
        Double maxPrice = tuple.getDouble(2);
        Double minPrice = tuple.getDouble(3);
        Long val = tuple.getLong(1);
        String id = tuple.getString(0);
        state.val = val;
        state.max = Math.max(state.max, maxPrice);
        state.min = Math.min(state.min, minPrice);
    }

    @Override
    public void complete(State state, TridentCollector collector) {
        collector.emit(new Values(state.id, state.val, state.max,
state.min));
    }

}

-- 
-Cheers

Jeyhun

Reply via email to