This only works for keyed streams, you have to use keyBy(). You can use the Checkpointed interface instead (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) <[email protected]> wrote: > Hi Kostas, > > > > Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t] > is the current value of the incoming sample and x[t-1] is the previous value > of the incoming sample. I store the current value in state store > (‘prev_tuple’) so that I can use it for computation in next cycle. As you > may observe, I am not using keyBy. I am simply printing out the resultant > tuple. > > > > It appears from the error message that I have to set the key serializer (and > possibly value serializer) for the state store. I am not sure how to do > that… > > > > Thanks for your interest in helping, > > > > > > Regards, > > Buvana > > > > public class stateful { > > private static String INPUT_KAFKA_TOPIC = null; > > private static int TIME_WINDOW = 0; > > > > public static void main(String[] args) throws Exception { > > > > if (args.length < 2) { > > throw new IllegalArgumentException("The application needs two > arguments. The first is the name of the kafka topic from which it has to \n" > > + "fetch the data. The second argument is the size of > the window, in seconds, to which the aggregation function must be applied. > \n"); > > } > > > > INPUT_KAFKA_TOPIC = args[0]; > > TIME_WINDOW = Integer.parseInt(args[1]); > > > > Properties properties = null; > > > > properties = new Properties(); > > properties.setProperty("bootstrap.servers", "localhost:9092"); > > properties.setProperty("zookeeper.connect", "localhost:2181"); > > properties.setProperty("group.id", "test"); > > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > //env.setStateBackend(new > FsStateBackend("file://home/buvana/flink/checkpoints")); > > > > DataStreamSource<String> stream = env > > .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new > SimpleStringSchema(), properties)); > > > > // maps the data into Flink tuples > > DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new > Rec2Tuple2()); > > > > // write the result to the console or in a Kafka topic > > streamTuples.print(); > > > > env.execute("plus one"); > > > > } > > > > public static class Rec2Tuple2 extends RichFlatMapFunction<String, > Tuple2<String,Double> > { > > private transient ValueState<Tuple2<String, Double>> prev_tuple; > > > > @Override > > public void flatMap(String incString, Collector<Tuple2<String, > Double>> out) throws Exception { > > try { > > Double value = Double.parseDouble(incString); > > System.out.println("value = " + value); > > Tuple2<String, Double> prev_stored_tp = prev_tuple.value(); > > System.out.println(prev_stored_tp); > > > > Double value2 = value - prev_stored_tp.f1; > > prev_stored_tp.f1 = value; > > prev_stored_tp.f0 = INPUT_KAFKA_TOPIC; > > prev_tuple.update(prev_stored_tp); > > > > Tuple2<String, Double> tp = new Tuple2<String, Double>(); > > tp.setField(INPUT_KAFKA_TOPIC, 0); > > tp.setField(value2, 1); > > out.collect(tp); > > > > } catch (NumberFormatException e) { > > System.out.println("Could not convert to Float" + > incString); > > System.err.println("Could not convert to Float" + > incString); > > } > > } > > > > @Override > > public void open(Configuration config) { > > ValueStateDescriptor<Tuple2<String, Double>> descriptor = > > new ValueStateDescriptor<>( > > "previous input value", // the state name > > TypeInformation.of(new TypeHint<Tuple2<String, > Double>>() {}), // type information > > Tuple2.of("test topic", 0.0)); // default value > of the state, if nothing was set > > prev_tuple = getRuntimeContext().getState(descriptor); > > } > > } > > } > > > > From: Kostas Kloudas [mailto:[email protected]] > Sent: Thursday, August 11, 2016 5:45 AM > To: [email protected] > Subject: Re: flink - Working with State example > > > > Hello Buvana, > > > > Can you share a bit more details on your operator and how you are using it? > > For example, are you using keyBy before using you custom operator? > > > > Thanks a lot, > > Kostas > > > > On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) > <[email protected]> wrote: > > > > Hello, > > > > I am utilizing the code snippet in: > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html > and particularly ‘open’ function in my code: > > @Override > > public void open(Configuration config) { > > ValueStateDescriptor<Tuple2<Long, Long>> descriptor = > > new ValueStateDescriptor<>( > > "average", // the state name > > TypeInformation.of(new TypeHint<Tuple2<Long, > Long>>() {}), // type information > > Tuple2.of(0L, 0L)); // default value of the state, > if nothing was set > > sum = getRuntimeContext().getState(descriptor); > > } > > > > When I run, I get the following error: > > Caused by: java.lang.RuntimeException: Error while getting state > > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120) > > at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) > > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) > > at > org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214) > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.Exception: State key serializer has not been configured > in the config. This operation cannot use partitioned state. > > at > org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260) > > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118) > > ... 8 more > > > > Where do I define the key & value serializer for state? > > > > Thanks, > > Buvana > >
