Hello all,
I'm trying to do a fairly simple task that is to find the maximum value
(Double) received so far in a stream. This is what I implemented:

POJO class:
public class Fish{
    public Integer fish_id;
    public Point coordinate;   //position

public Fish() {};

public Fish(fish_id,double x, double y) {
   //assign to fish object
}

Java_main.java
 DataStream text  = env
                .addSource(new FlinkKafkaConsumer<>("test", new
JSONKeyValueDeserializationSchema(false),properties).setStartFromLatest());

      DataStream<Fish> fishes = text.map(new MapFunction<ObjectNode,
Fish>() {
            @Override
            public Fish map(ObjectNode json) throws Exception {
               Fish fishes = new Fish(
json.get("value").get("id").asInteger()
,json.get("value").get("x_begin").asDouble(),json.get("value").get("y_begin").asDouble());
                return fishes;
            }
        });


 I can't seem to apply aggregations on the Point class without extracting
the x coordinates in a separate stream so here are the 2 methods I have
applied:

Method 1: Simple reduce


KeyedStream<Fish, Integer> keyed = fishes.keyBy(s->s.fish_id);
        keyed.reduce(new ReduceFunction<Fish>() {
            @Override
            public Fish reduce(Fish t, Fish t1) throws Exception {
                if (t.X > t1.X) {
                    return t;
                } else
                    return t1;
            }
        }).map(new MapFunction<Fish, Double>() {
                    @Override
                    public Double map(Fish t) throws Exception {
                        return t.point.getX();
                    }
                }).print();



Result: 1> -73.8517632
1> -73.851446
1> -73.851446
1> -73.8505642
1> -73.851446       //smaller than previous value!
1> -73.851446
1> -73.851446
1> -73.8505642
1> -73.8517632
1> -73.851446
3> -73.85012
3> -73.850212
3> -73.851979     //smaller than previous value
3> -73.850212
3> -73.8512969
3> -73.8512969

*1)* I'm trying to compute the max so why do I see smaller values after the
latest maximum value. I think this is because order of the outputs is not
preserved as same as inputs?
If this is so how do I ensure that the order is preserved and I only see
the latest maximum value?

*2)* Another thing I have noticed is that if instance 1 produces the max
say -73.8505642 but then instance 2 would produce  -73.9064 which is again
smaller than the value produced by instance 1. I'm assuming its because
there is no communication between parallel instances hence they produce two
value. If this is so how do I get ONE maximum value from all the parallel
instances combined?

Method 2: Using States

public class GetMaximum extends RichMapFunction<Fish, Fish> {

    private transient ValueState<Fish> max;

    @Override
    public Fish map(Fish input) throws Exception {

        // access the state value
       Fish currentMaximum = max.value();

        if (input.point.getX() > currentMaximum.point.getX()) {
            currentMaximum.objid = input.objid;
            currentMaximum.point = (org.locationtech.jts.geom.Point)
input.point.clone();
            max.update(currentMaximum);
        }
        return currentMaximum;
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Fish> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Fish>() {}), //
type information
                        new Fish(-100,0)); // default value of the state,
if nothing was set
        max = getRuntimeContext().getState(descriptor);
    }
}

*3) *I'm getting the same results as method 1. isn't ValueState shared
between all instances of the same operator?

*4) *Out of the two methodologies which is a better choice?

Would really appreciate your help!

Best Regards,
Komal

Reply via email to