Hi,
I think it doesn't work because the concrete type of M is not available to
create a TypeInformation for M. What you can do is manually pass a
TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
that when creating the state descriptor.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mneum...@sics.se> wrote:

> Hey,
>
> I have a FlatMap that uses some generics (appended at the end of the mail).
> I have some trouble with the type inference running into
> InvalidTypesException on the first line in the open function.
>
> How can I fix it?
>
> Cheers Martin
>
>
>
>
> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends 
> RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>     private transient ValueState<M> microModel;
>     private final double threshold;
>     private boolean updateIfAnomaly;
>     private M initModel;
>
>     public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) 
> {
>         this.threshold = threshold;
>         this.updateIfAnomaly = updateIfAnomaly;
>         this.initModel = model;
>
>     }
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         ValueStateDescriptor<M> descriptor =
>                 new ValueStateDescriptor<>(
>                         "RollingMicroModel",
>                         TypeInformation.of(new TypeHint<M>() {
>                         }),initModel
>                         );
>         microModel = getRuntimeContext().getState(descriptor);
>     }
>
>     @Override
>     public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> 
> collector) throws Exception {
>         M model = microModel.value();
>         Anomaly res  = model.calculateAnomaly(sample.f0);
>
>         if ( res.getScore() <= threshold || updateIfAnomaly){
>             model.addWindow(sample.f0);
>             microModel.update(model);
>         }
>         collector.collect(new Tuple2<>(res,sample.f1));
>     }
> }
>
>
>

Reply via email to