My process function is like : private static class MergeFunction extends RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer, ObjectNode>> {
private ValueState<Tuple2<Integer, ObjectNode>> state; @Override @SuppressWarnings("unchecked") public void open(Configuration parameters) throws Exception { state = getRuntimeContext().getState(new ValueStateDescriptor<>("mystate", (Class<Tuple2<Integer,ObjectNode>>) (Object)Tuple2.class)); } } When I running the code: 05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched to FAILED java.lang.RuntimeException: Cannot create full type information based on the given class. If the type has generics, please at org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:124) at org.apache.flink.api.common.state.ValueStateDescriptor.<init>(ValueStateDescriptor.java:101) at com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs to be parameterized by using generics. at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557) at org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:122) ... 9 more Can I use generics with ValueState? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-ValueState-use-generics-tp13038.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.