Hi, I think it doesn't work because the concrete type of M is not available to create a TypeInformation for M. What you can do is manually pass a TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use that when creating the state descriptor.
Cheers, Aljoscha On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mneum...@sics.se> wrote: > Hey, > > I have a FlatMap that uses some generics (appended at the end of the mail). > I have some trouble with the type inference running into > InvalidTypesException on the first line in the open function. > > How can I fix it? > > Cheers Martin > > > > > public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends > RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> { > private transient ValueState<M> microModel; > private final double threshold; > private boolean updateIfAnomaly; > private M initModel; > > public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) > { > this.threshold = threshold; > this.updateIfAnomaly = updateIfAnomaly; > this.initModel = model; > > } > > @Override > public void open(Configuration parameters) throws Exception { > ValueStateDescriptor<M> descriptor = > new ValueStateDescriptor<>( > "RollingMicroModel", > TypeInformation.of(new TypeHint<M>() { > }),initModel > ); > microModel = getRuntimeContext().getState(descriptor); > } > > @Override > public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> > collector) throws Exception { > M model = microModel.value(); > Anomaly res = model.calculateAnomaly(sample.f0); > > if ( res.getScore() <= threshold || updateIfAnomaly){ > model.addWindow(sample.f0); > microModel.update(model); > } > collector.collect(new Tuple2<>(res,sample.f1)); > } > } > > >