Re: flink - Working with State example
Hi, you mean the directory is completely empty? Can you check in the JobManager dashboard whether it reports any successful checkpoints for the job? One possible explanation is an optimization that the FsStateBackend performs: when the state is very small it will not actually be written to files but stored in the meta data of the checkpoint that is sent to the JobManager. This would explain why there are no files. You can set the threshold size for this optimization with an additional FsStateBackend constructor parameter, i.e. new FsStateBackend("file:///home/buvana/flink/checkpoints", 0) to disable this optimization. Cheers, Aljoscha On Fri, 12 Aug 2016 at 21:12 Ramanan, Buvana (Nokia - US) < buvana.rama...@nokia-bell-labs.com> wrote: > Hi Kostas, > > I am trying to use FsStateBackend as the backend for storing state. And > configure it as follows in the code: > >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStateBackend(new > FsStateBackend("file:///home/buvana/flink/checkpoints")); > env.enableCheckpointing(1); > > everything else is same as the code I shared with you previously. > > When I execute, I see that a directory is created under > /home/buvana/flink/checkpoints, but there is nothing under that directory. > I was expecting to find some file / sub dir there. > > Please explain. > > Thanks, > Buvana > > -Original Message- > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > Sent: Friday, August 12, 2016 1:37 AM > To: user@flink.apache.org > Subject: Re: flink - Working with State example > > No problem! > > Regards, > Kostas > > > On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) < > buvana.rama...@nokia-bell-labs.com> wrote: > > > > Kostas, > > Good catch! That makes it working! Thank you so much for the help. > > Regards, > > Buvana > > > > -Original Message- > > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > > Sent: Thursday, August 11, 2016 11:22 AM > > To: user@flink.apache.org > > Subject: Re: flink - Working with State example > > > > Hi Buvana, > > > > At a first glance, your snapshotState() should return a Double. > > > > Kostas > > > >> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) < > buvana.rama...@nokia-bell-labs.com> wrote: > >> > >> Thank you Kostas & Ufuk. I get into the following compilation error > when I use checkpointed interface. Pasting the code & message as follows: > >> > >> Is the Serializable definition supposed to be from java.io.Serializable > or somewhere else? > >> > >> Thanks again, > >> Buvana > >> > >> == > >> == > >> Code: > >> > >> import org.apache.flink.api.common.functions.FlatMapFunction; > >> import org.apache.flink.api.common.functions.MapFunction; > >> import org.apache.flink.streaming.api.checkpoint.Checkpointed; > >> import org.apache.flink.configuration.Configuration; > >> import org.apache.flink.api.common.typeinfo.TypeInformation; > >> import org.apache.flink.api.common.typeinfo.TypeHint; > >> import org.apache.flink.api.common.functions.RichFlatMapFunction; > >> > >> import java.io.Serializable; > >> import org.apache.flink.api.java.tuple.Tuple2; > >> import org.apache.flink.streaming.api.datastream.DataStream; > >> import org.apache.flink.streaming.api.datastream.DataStreamSource; > >> import > >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >> import > >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; > >> import > >> org.apache.flink.streaming.util.serialization.SimpleStringSchema; > >> import org.apache.flink.util.Collector; > >> > >> import java.util.Properties; > >> > >> /** > >> * Created by buvana on 8/9/16. > >> */ > >> public class stateful { > >> private static String INPUT_KAFKA_TOPIC = null; > >> --- > >> --- skipping the main as it’s the same as before except for class name > >> change - > >> --- > >> public static class MapStateful extends > RichFlatMapFunction<String, Tuple2<String, Double>> > >> implements Checkpointed { > >> > >> private Double prev_tuple = null; > >> > >>
RE: flink - Working with State example
Hi Kostas, I am trying to use FsStateBackend as the backend for storing state. And configure it as follows in the code: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("file:///home/buvana/flink/checkpoints")); env.enableCheckpointing(1); everything else is same as the code I shared with you previously. When I execute, I see that a directory is created under /home/buvana/flink/checkpoints, but there is nothing under that directory. I was expecting to find some file / sub dir there. Please explain. Thanks, Buvana -Original Message- From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] Sent: Friday, August 12, 2016 1:37 AM To: user@flink.apache.org Subject: Re: flink - Working with State example No problem! Regards, Kostas > On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) > <buvana.rama...@nokia-bell-labs.com> wrote: > > Kostas, > Good catch! That makes it working! Thank you so much for the help. > Regards, > Buvana > > -Original Message- > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > Sent: Thursday, August 11, 2016 11:22 AM > To: user@flink.apache.org > Subject: Re: flink - Working with State example > > Hi Buvana, > > At a first glance, your snapshotState() should return a Double. > > Kostas > >> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) >> <buvana.rama...@nokia-bell-labs.com> wrote: >> >> Thank you Kostas & Ufuk. I get into the following compilation error when I >> use checkpointed interface. Pasting the code & message as follows: >> >> Is the Serializable definition supposed to be from java.io.Serializable or >> somewhere else? >> >> Thanks again, >> Buvana >> >> == >> == >> Code: >> >> import org.apache.flink.api.common.functions.FlatMapFunction; >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.streaming.api.checkpoint.Checkpointed; >> import org.apache.flink.configuration.Configuration; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.common.typeinfo.TypeHint; >> import org.apache.flink.api.common.functions.RichFlatMapFunction; >> >> import java.io.Serializable; >> import org.apache.flink.api.java.tuple.Tuple2; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.datastream.DataStreamSource; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; >> import >> org.apache.flink.streaming.util.serialization.SimpleStringSchema; >> import org.apache.flink.util.Collector; >> >> import java.util.Properties; >> >> /** >> * Created by buvana on 8/9/16. >> */ >> public class stateful { >> private static String INPUT_KAFKA_TOPIC = null; >> --- >> --- skipping the main as it’s the same as before except for class name >> change - >> --- >> public static class MapStateful extends RichFlatMapFunction<String, >> Tuple2<String, Double>> >> implements Checkpointed { >> >> private Double prev_tuple = null; >> >> @Override >> public void flatMap(String incString, Collector<Tuple2<String, >> Double>> out) { >> try { >> Double value = Double.parseDouble(incString); >> System.out.println("value = " + value); >> System.out.println(prev_tuple); >> >> Double value2 = value - prev_tuple; >> prev_tuple = value; >> >> Tuple2<String, Double> tp = new Tuple2<String, Double>(); >> tp.setField(INPUT_KAFKA_TOPIC, 0); >> tp.setField(value2, 1); >> out.collect(tp); >> } catch (NumberFormatException e) { >> System.out.println("Could not convert to Float" + incString); >> System.err.println("Could not convert to Float" + incString); >> } >> } >> @Override >> public void open(Configuration config) { >
Re: flink - Working with State example
No problem! Regards, Kostas > On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) > <buvana.rama...@nokia-bell-labs.com> wrote: > > Kostas, > Good catch! That makes it working! Thank you so much for the help. > Regards, > Buvana > > -Original Message- > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > Sent: Thursday, August 11, 2016 11:22 AM > To: user@flink.apache.org > Subject: Re: flink - Working with State example > > Hi Buvana, > > At a first glance, your snapshotState() should return a Double. > > Kostas > >> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) >> <buvana.rama...@nokia-bell-labs.com> wrote: >> >> Thank you Kostas & Ufuk. I get into the following compilation error when I >> use checkpointed interface. Pasting the code & message as follows: >> >> Is the Serializable definition supposed to be from java.io.Serializable or >> somewhere else? >> >> Thanks again, >> Buvana >> >> == >> == >> Code: >> >> import org.apache.flink.api.common.functions.FlatMapFunction; >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.streaming.api.checkpoint.Checkpointed; >> import org.apache.flink.configuration.Configuration; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.common.typeinfo.TypeHint; >> import org.apache.flink.api.common.functions.RichFlatMapFunction; >> >> import java.io.Serializable; >> import org.apache.flink.api.java.tuple.Tuple2; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.datastream.DataStreamSource; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; >> import >> org.apache.flink.streaming.util.serialization.SimpleStringSchema; >> import org.apache.flink.util.Collector; >> >> import java.util.Properties; >> >> /** >> * Created by buvana on 8/9/16. >> */ >> public class stateful { >> private static String INPUT_KAFKA_TOPIC = null; >> --- >> --- skipping the main as it’s the same as before except for class name >> change - >> --- >> public static class MapStateful extends RichFlatMapFunction<String, >> Tuple2<String, Double>> >> implements Checkpointed { >> >> private Double prev_tuple = null; >> >> @Override >> public void flatMap(String incString, Collector<Tuple2<String, >> Double>> out) { >> try { >> Double value = Double.parseDouble(incString); >> System.out.println("value = " + value); >> System.out.println(prev_tuple); >> >> Double value2 = value - prev_tuple; >> prev_tuple = value; >> >> Tuple2<String, Double> tp = new Tuple2<String, Double>(); >> tp.setField(INPUT_KAFKA_TOPIC, 0); >> tp.setField(value2, 1); >> out.collect(tp); >> } catch (NumberFormatException e) { >> System.out.println("Could not convert to Float" + incString); >> System.err.println("Could not convert to Float" + incString); >> } >> } >> @Override >> public void open(Configuration config) { >> if (prev_tuple == null) { >> // only recreate if null >> // restoreState will be called before open() >> // so this will already set the sum to the restored value >> prev_tuple = new Double("0.0"); >> } >> } >> >> @Override >> public Serializable snapshotState( >> long checkpointId, >> long checkpointTimestamp) throws Exception { >> return prev_tuple; >> } >> >> >> @Override >> public void restoreState(Double state) { >> prev_tuple = state; >> } >> } >> } >> == >> = >> ERROR message while building: >> >> $ mvn clean package >&g
RE: flink - Working with State example
Kostas, Good catch! That makes it working! Thank you so much for the help. Regards, Buvana -Original Message- From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] Sent: Thursday, August 11, 2016 11:22 AM To: user@flink.apache.org Subject: Re: flink - Working with State example Hi Buvana, At a first glance, your snapshotState() should return a Double. Kostas > On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) > <buvana.rama...@nokia-bell-labs.com> wrote: > > Thank you Kostas & Ufuk. I get into the following compilation error when I > use checkpointed interface. Pasting the code & message as follows: > > Is the Serializable definition supposed to be from java.io.Serializable or > somewhere else? > > Thanks again, > Buvana > > == > == > Code: > > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.streaming.api.checkpoint.Checkpointed; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.functions.RichFlatMapFunction; > > import java.io.Serializable; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; > import > org.apache.flink.streaming.util.serialization.SimpleStringSchema; > import org.apache.flink.util.Collector; > > import java.util.Properties; > > /** > * Created by buvana on 8/9/16. > */ > public class stateful { >private static String INPUT_KAFKA_TOPIC = null; > --- > --- skipping the main as it’s the same as before except for class name > change - > --- > public static class MapStateful extends RichFlatMapFunction<String, > Tuple2<String, Double>> >implements Checkpointed { > >private Double prev_tuple = null; > >@Override >public void flatMap(String incString, Collector<Tuple2<String, > Double>> out) { >try { >Double value = Double.parseDouble(incString); >System.out.println("value = " + value); >System.out.println(prev_tuple); > >Double value2 = value - prev_tuple; >prev_tuple = value; > >Tuple2<String, Double> tp = new Tuple2<String, Double>(); >tp.setField(INPUT_KAFKA_TOPIC, 0); >tp.setField(value2, 1); >out.collect(tp); >} catch (NumberFormatException e) { >System.out.println("Could not convert to Float" + incString); >System.err.println("Could not convert to Float" + incString); >} >} >@Override >public void open(Configuration config) { >if (prev_tuple == null) { >// only recreate if null >// restoreState will be called before open() >// so this will already set the sum to the restored value >prev_tuple = new Double("0.0"); >} >} > >@Override >public Serializable snapshotState( >long checkpointId, >long checkpointTimestamp) throws Exception { >return prev_tuple; >} > > >@Override >public void restoreState(Double state) { >prev_tuple = state; >} >} > } > == > = > ERROR message while building: > > $ mvn clean package > [INFO] Scanning for projects... > [INFO] > > [INFO] > -- > -- [INFO] Building Flink Quickstart Job 0.1 [INFO] > -- > -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has > been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] --- > maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO] > Deleting /home/buvana/flink/flink
Re: flink - Working with State example
wiki-edits > --- > [INFO] Changes detected - recompiling the module! > [INFO] Compiling 7 source files to > /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes > [INFO] - > [ERROR] COMPILATION ERROR : > [INFO] - > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] > wikiedits.stateful.MapStateful is not abstract and does not override > abstract method snapshotState(long,long) in > org.apache.flink.streaming.api.checkpoint.Checkpointed > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] > snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement > snapshotState(long,long) in > org.apache.flink.streaming.api.checkpoint.Checkpointed > return type java.io.Serializable is not compatible with java.lang.Double > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] > method does not override or implement a method from a supertype > [INFO] 3 errors > [INFO] - > [INFO] > > [INFO] BUILD FAILURE > [INFO] > > [INFO] Total time: 2.171s > [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 > [INFO] Final Memory: 26M/660M > [INFO] > > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) > on project wiki-edits: Compilation failure: Compilation failure: > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] > wikiedits.stateful.MapStateful is not abstract and does not override > abstract method snapshotState(long,long) in > org.apache.flink.streaming.api.checkpoint.Checkpointed > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] > snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement > snapshotState(long,long) in > org.apache.flink.streaming.api.checkpoint.Checkpointed > [ERROR] return type java.io.Serializable is not compatible with > java.lang.Double > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] > method does not override or implement a method from a supertype > [ERROR] -> [Help 1] > [ERROR] > [ERROR] To see the full stack trace of the errors, re-run Maven with the -e > switch. > [ERROR] Re-run Maven using the -X switch to enable full debug logging. > [ERROR] > [ERROR] For more information about the errors and possible solutions, please > read the following articles: > [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > > > -Original Message- > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > Sent: Thursday, August 11, 2016 10:34 AM > To: user@flink.apache.org > Subject: Re: flink - Working with State example > > Exactly as Ufuk suggested, if you are not grouping your stream by key, you > should use the checkpointed interface. > > The reason I asked before if you are using the keyBy() is because this is the > one that implicitly sets the keySerializer and scopes your (keyed) state to a > specific key. > > If there is no keying, then keyed state cannot be used and the Checkpointed > interface should be used instead. > > Let us know if you need anything else. > > Kostas > >> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote: >> >> This only works for keyed streams, you have to use keyBy(). >> >> You can use the Checkpointed interface instead >> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields). >> >> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) >> <buvana.rama...@nokia-bell-labs.com> wrote: >>> Hi Kostas, >>> >>> >>> >>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where >>> x[t] is the current value of the incoming sample and x[t-1] is the >>> previous value of the incoming sample. I store the current value in >>> state store >>> (‘prev_tuple’) so that I can use it for computation in next cycle. As >>> you may observ
RE: flink - Working with State example
ng,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed return type java.io.Serializable is not compatible with java.lang.Double [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype [INFO] 3 errors [INFO] - [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 2.171s [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory: 26M/660M [INFO] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure: [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed [ERROR] return type java.io.Serializable is not compatible with java.lang.Double [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException -Original Message- From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] Sent: Thursday, August 11, 2016 10:34 AM To: user@flink.apache.org Subject: Re: flink - Working with State example Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface. The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key. If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead. Let us know if you need anything else. Kostas > On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote: > > This only works for keyed streams, you have to use keyBy(). > > You can use the Checkpointed interface instead > (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields). > > On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) > <buvana.rama...@nokia-bell-labs.com> wrote: >> Hi Kostas, >> >> >> >> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where >> x[t] is the current value of the incoming sample and x[t-1] is the >> previous value of the incoming sample. I store the current value in >> state store >> (‘prev_tuple’) so that I can use it for computation in next cycle. As >> you may observe, I am not using keyBy. I am simply printing out the >> resultant tuple. >> >> >> >> It appears from the error message that I have to set the key >> serializer (and possibly value serializer) for the state store. I am >> not sure how to do that… >> >> >> >> Thanks for your interest in helping, >> >> >> >> >> >> Regards, >> >> Buvana >> >> >> >> public class stateful { >> >>private static String INPUT_KAFKA_TOPIC = null; >> >>private static int TIME_WINDOW = 0; >> >> >> >>public static void main(String[] args) throws Exception { >> >> >> >>if (args.length < 2) { >> >>throw new IllegalArgumentException("The application needs >> two arguments. The first is the name of the kafka topic from which it has to >> \n" >> >>+ "fetch the data. The second argument is the size >> of the window, in seconds, to which the aggregation function must
Re: flink - Working with State example
> >>Double value2 = value - prev_stored_tp.f1; >> >>prev_stored_tp.f1 = value; >> >>prev_stored_tp.f0 = INPUT_KAFKA_TOPIC; >> >>prev_tuple.update(prev_stored_tp); >> >> >> >>Tuple2<String, Double> tp = new Tuple2<String, Double>(); >> >>tp.setField(INPUT_KAFKA_TOPIC, 0); >> >>tp.setField(value2, 1); >> >>out.collect(tp); >> >> >> >>} catch (NumberFormatException e) { >> >>System.out.println("Could not convert to Float" + >> incString); >> >>System.err.println("Could not convert to Float" + >> incString); >> >>} >> >>} >> >> >> >>@Override >> >>public void open(Configuration config) { >> >>ValueStateDescriptor<Tuple2<String, Double>> descriptor = >> >>new ValueStateDescriptor<>( >> >>"previous input value", // the state name >> >>TypeInformation.of(new TypeHint<Tuple2<String, >> Double>>() {}), // type information >> >>Tuple2.of("test topic", 0.0)); // default value >> of the state, if nothing was set >> >>prev_tuple = getRuntimeContext().getState(descriptor); >> >>} >> >>} >> >> } >> >> >> >> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] >> Sent: Thursday, August 11, 2016 5:45 AM >> To: user@flink.apache.org >> Subject: Re: flink - Working with State example >> >> >> >> Hello Buvana, >> >> >> >> Can you share a bit more details on your operator and how you are using it? >> >> For example, are you using keyBy before using you custom operator? >> >> >> >> Thanks a lot, >> >> Kostas >> >> >> >> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) >> <buvana.rama...@nokia-bell-labs.com> wrote: >> >> >> >> Hello, >> >> >> >> I am utilizing the code snippet in: >> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html >> and particularly ‘open’ function in my code: >> >> @Override >> >>public void open(Configuration config) { >> >>ValueStateDescriptor<Tuple2<Long, Long>> descriptor = >> >>new ValueStateDescriptor<>( >> >>"average", // the state name >> >>TypeInformation.of(new TypeHint<Tuple2<Long, >> Long>>() {}), // type information >> >>Tuple2.of(0L, 0L)); // default value of the state, >> if nothing was set >> >>sum = getRuntimeContext().getState(descriptor); >> >>} >> >> >> >> When I run, I get the following error: >> >> Caused by: java.lang.RuntimeException: Error while getting state >> >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120) >> >> at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) >> >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) >> >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) >> >> at >> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214) >> >> at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.lang.Exception: State key serializer has not been configured >> in the config. This operation cannot use partitioned state. >> >> at >> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199) >> >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260) >> >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118) >> >> ... 8 more >> >> >> >> Where do I define the key & value serializer for state? >> >> >> >> Thanks, >> >> Buvana >> >>
Re: flink - Working with State example
This only works for keyed streams, you have to use keyBy(). You can use the Checkpointed interface instead (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields). On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) <buvana.rama...@nokia-bell-labs.com> wrote: > Hi Kostas, > > > > Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t] > is the current value of the incoming sample and x[t-1] is the previous value > of the incoming sample. I store the current value in state store > (‘prev_tuple’) so that I can use it for computation in next cycle. As you > may observe, I am not using keyBy. I am simply printing out the resultant > tuple. > > > > It appears from the error message that I have to set the key serializer (and > possibly value serializer) for the state store. I am not sure how to do > that… > > > > Thanks for your interest in helping, > > > > > > Regards, > > Buvana > > > > public class stateful { > > private static String INPUT_KAFKA_TOPIC = null; > > private static int TIME_WINDOW = 0; > > > > public static void main(String[] args) throws Exception { > > > > if (args.length < 2) { > > throw new IllegalArgumentException("The application needs two > arguments. The first is the name of the kafka topic from which it has to \n" > > + "fetch the data. The second argument is the size of > the window, in seconds, to which the aggregation function must be applied. > \n"); > > } > > > > INPUT_KAFKA_TOPIC = args[0]; > > TIME_WINDOW = Integer.parseInt(args[1]); > > > > Properties properties = null; > > > > properties = new Properties(); > > properties.setProperty("bootstrap.servers", "localhost:9092"); > > properties.setProperty("zookeeper.connect", "localhost:2181"); > > properties.setProperty("group.id", "test"); > > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > //env.setStateBackend(new > FsStateBackend("file://home/buvana/flink/checkpoints")); > > > > DataStreamSource stream = env > > .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new > SimpleStringSchema(), properties)); > > > > // maps the data into Flink tuples > > DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new > Rec2Tuple2()); > > > > // write the result to the console or in a Kafka topic > > streamTuples.print(); > > > > env.execute("plus one"); > > > > } > > > > public static class Rec2Tuple2 extends RichFlatMapFunction<String, > Tuple2<String,Double> > { > > private transient ValueState<Tuple2<String, Double>> prev_tuple; > > > > @Override > > public void flatMap(String incString, Collector<Tuple2<String, > Double>> out) throws Exception { > > try { > > Double value = Double.parseDouble(incString); > > System.out.println("value = " + value); > > Tuple2<String, Double> prev_stored_tp = prev_tuple.value(); > > System.out.println(prev_stored_tp); > > > > Double value2 = value - prev_stored_tp.f1; > > prev_stored_tp.f1 = value; > > prev_stored_tp.f0 = INPUT_KAFKA_TOPIC; > > prev_tuple.update(prev_stored_tp); > > > > Tuple2<String, Double> tp = new Tuple2<String, Double>(); > > tp.setField(INPUT_KAFKA_TOPIC, 0); > > tp.setField(value2, 1); > > out.collect(tp); > > > > } catch (NumberFormatException e) { > > System.out.println("Could not convert to Float" + > incString); > > System.err.println("Could not convert to Float" + > incString); > > } > > } > > > > @Override > > public void open(Configuration config) { > > ValueStateDescriptor<Tuple2<String, Double>> descriptor = > > new ValueStateDescriptor<>( > > "previous input value", // the state name > >
RE: flink - Working with State example
Hi Kostas, Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t] is the current value of the incoming sample and x[t-1] is the previous value of the incoming sample. I store the current value in state store (‘prev_tuple’) so that I can use it for computation in next cycle. As you may observe, I am not using keyBy. I am simply printing out the resultant tuple. It appears from the error message that I have to set the key serializer (and possibly value serializer) for the state store. I am not sure how to do that… Thanks for your interest in helping, Regards, Buvana public class stateful { private static String INPUT_KAFKA_TOPIC = null; private static int TIME_WINDOW = 0; public static void main(String[] args) throws Exception { if (args.length < 2) { throw new IllegalArgumentException("The application needs two arguments. The first is the name of the kafka topic from which it has to \n" + "fetch the data. The second argument is the size of the window, in seconds, to which the aggregation function must be applied. \n"); } INPUT_KAFKA_TOPIC = args[0]; TIME_WINDOW = Integer.parseInt(args[1]); Properties properties = null; properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setStateBackend(new FsStateBackend("file://home/buvana/flink/checkpoints")); DataStreamSource stream = env .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), properties)); // maps the data into Flink tuples DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new Rec2Tuple2()); // write the result to the console or in a Kafka topic streamTuples.print(); env.execute("plus one"); } public static class Rec2Tuple2 extends RichFlatMapFunction<String, Tuple2<String,Double> > { private transient ValueState<Tuple2<String, Double>> prev_tuple; @Override public void flatMap(String incString, Collector<Tuple2<String, Double>> out) throws Exception { try { Double value = Double.parseDouble(incString); System.out.println("value = " + value); Tuple2<String, Double> prev_stored_tp = prev_tuple.value(); System.out.println(prev_stored_tp); Double value2 = value - prev_stored_tp.f1; prev_stored_tp.f1 = value; prev_stored_tp.f0 = INPUT_KAFKA_TOPIC; prev_tuple.update(prev_stored_tp); Tuple2<String, Double> tp = new Tuple2<String, Double>(); tp.setField(INPUT_KAFKA_TOPIC, 0); tp.setField(value2, 1); out.collect(tp); } catch (NumberFormatException e) { System.out.println("Could not convert to Float" + incString); System.err.println("Could not convert to Float" + incString); } } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<String, Double>> descriptor = new ValueStateDescriptor<>( "previous input value", // the state name TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}), // type information Tuple2.of("test topic", 0.0)); // default value of the state, if nothing was set prev_tuple = getRuntimeContext().getState(descriptor); } } } From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] Sent: Thursday, August 11, 2016 5:45 AM To: user@flink.apache.org Subject: Re: flink - Working with State example Hello Buvana, Can you share a bit more details on your operator and how you are using it? For example, are you using keyBy before using you custom operator? Thanks a lot, Kostas On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) <buvana.rama...@nokia-bell-labs.com<mailto:buvana.rama...@nokia-bell-labs.com>> wrote: Hello, I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html and particularly ‘open’ function in my code: @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
flink - Working with State example
Hello, I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html and particularly ‘open’ function in my code: @Override public void open(Configuration config) { ValueStateDescriptor> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint >() {}), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } When I run, I get the following error: Caused by: java.lang.RuntimeException: Error while getting state at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120) at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state. at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118) ... 8 more Where do I define the key & value serializer for state? Thanks, Buvana