Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on 
the DataStream before window but fails if placed after window's “apply” 
function.I was testing two streams, main “Features” on flatMap1 constantly 
ingesting data and control stream “Model” on flatMap2 changing the model on 
request.I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 
always see b0 and b1 as was set to 0 at the initialization.Am I missing 
something obvious here?Thanks a lot, Vladimirpublic static class applyModel 
implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}

Reply via email to