Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key. If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead. Let us know if you need anything else. Kostas > On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote: > > This only works for keyed streams, you have to use keyBy(). > > You can use the Checkpointed interface instead > (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields). > > On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) > <buvana.rama...@nokia-bell-labs.com> wrote: >> Hi Kostas, >> >> >> >> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t] >> is the current value of the incoming sample and x[t-1] is the previous value >> of the incoming sample. I store the current value in state store >> (‘prev_tuple’) so that I can use it for computation in next cycle. As you >> may observe, I am not using keyBy. I am simply printing out the resultant >> tuple. >> >> >> >> It appears from the error message that I have to set the key serializer (and >> possibly value serializer) for the state store. I am not sure how to do >> that… >> >> >> >> Thanks for your interest in helping, >> >> >> >> >> >> Regards, >> >> Buvana >> >> >> >> public class stateful { >> >> private static String INPUT_KAFKA_TOPIC = null; >> >> private static int TIME_WINDOW = 0; >> >> >> >> public static void main(String[] args) throws Exception { >> >> >> >> if (args.length < 2) { >> >> throw new IllegalArgumentException("The application needs two >> arguments. The first is the name of the kafka topic from which it has to \n" >> >> + "fetch the data. The second argument is the size of >> the window, in seconds, to which the aggregation function must be applied. >> \n"); >> >> } >> >> >> >> INPUT_KAFKA_TOPIC = args[0]; >> >> TIME_WINDOW = Integer.parseInt(args[1]); >> >> >> >> Properties properties = null; >> >> >> >> properties = new Properties(); >> >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> >> properties.setProperty("zookeeper.connect", "localhost:2181"); >> >> properties.setProperty("group.id", "test"); >> >> >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> //env.setStateBackend(new >> FsStateBackend("file://home/buvana/flink/checkpoints")); >> >> >> >> DataStreamSource<String> stream = env >> >> .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new >> SimpleStringSchema(), properties)); >> >> >> >> // maps the data into Flink tuples >> >> DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new >> Rec2Tuple2()); >> >> >> >> // write the result to the console or in a Kafka topic >> >> streamTuples.print(); >> >> >> >> env.execute("plus one"); >> >> >> >> } >> >> >> >> public static class Rec2Tuple2 extends RichFlatMapFunction<String, >> Tuple2<String,Double> > { >> >> private transient ValueState<Tuple2<String, Double>> prev_tuple; >> >> >> >> @Override >> >> public void flatMap(String incString, Collector<Tuple2<String, >> Double>> out) throws Exception { >> >> try { >> >> Double value = Double.parseDouble(incString); >> >> System.out.println("value = " + value); >> >> Tuple2<String, Double> prev_stored_tp = prev_tuple.value(); >> >> System.out.println(prev_stored_tp); >> >> >> >> Double value2 = value - prev_stored_tp.f1; >> >> prev_stored_tp.f1 = value; >> >> prev_stored_tp.f0 = INPUT_KAFKA_TOPIC; >> >> prev_tuple.update(prev_stored_tp); >> >> >> >> Tuple2<String, Double> tp = new Tuple2<String, Double>(); >> >> tp.setField(INPUT_KAFKA_TOPIC, 0); >> >> tp.setField(value2, 1); >> >> out.collect(tp); >> >> >> >> } catch (NumberFormatException e) { >> >> System.out.println("Could not convert to Float" + >> incString); >> >> System.err.println("Could not convert to Float" + >> incString); >> >> } >> >> } >> >> >> >> @Override >> >> public void open(Configuration config) { >> >> ValueStateDescriptor<Tuple2<String, Double>> descriptor = >> >> new ValueStateDescriptor<>( >> >> "previous input value", // the state name >> >> TypeInformation.of(new TypeHint<Tuple2<String, >> Double>>() {}), // type information >> >> Tuple2.of("test topic", 0.0)); // default value >> of the state, if nothing was set >> >> prev_tuple = getRuntimeContext().getState(descriptor); >> >> } >> >> } >> >> } >> >> >> >> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] >> Sent: Thursday, August 11, 2016 5:45 AM >> To: user@flink.apache.org >> Subject: Re: flink - Working with State example >> >> >> >> Hello Buvana, >> >> >> >> Can you share a bit more details on your operator and how you are using it? >> >> For example, are you using keyBy before using you custom operator? >> >> >> >> Thanks a lot, >> >> Kostas >> >> >> >> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) >> <buvana.rama...@nokia-bell-labs.com> wrote: >> >> >> >> Hello, >> >> >> >> I am utilizing the code snippet in: >> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html >> and particularly ‘open’ function in my code: >> >> @Override >> >> public void open(Configuration config) { >> >> ValueStateDescriptor<Tuple2<Long, Long>> descriptor = >> >> new ValueStateDescriptor<>( >> >> "average", // the state name >> >> TypeInformation.of(new TypeHint<Tuple2<Long, >> Long>>() {}), // type information >> >> Tuple2.of(0L, 0L)); // default value of the state, >> if nothing was set >> >> sum = getRuntimeContext().getState(descriptor); >> >> } >> >> >> >> When I run, I get the following error: >> >> Caused by: java.lang.RuntimeException: Error while getting state >> >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120) >> >> at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) >> >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) >> >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) >> >> at >> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214) >> >> at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.lang.Exception: State key serializer has not been configured >> in the config. This operation cannot use partitioned state. >> >> at >> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199) >> >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260) >> >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118) >> >> ... 8 more >> >> >> >> Where do I define the key & value serializer for state? >> >> >> >> Thanks, >> >> Buvana >> >>