Re: flink - Working with State example

2016-08-25 Thread Aljoscha Krettek
Hi,
you mean the directory is completely empty? Can you check in the JobManager
dashboard whether it reports any successful checkpoints for the job? One
possible explanation is an optimization that the FsStateBackend performs:
when the state is very small it will not actually be written to files but
stored in the meta data of the checkpoint that is sent to the JobManager.
This would explain why there are no files. You can set the threshold size
for this optimization with an additional FsStateBackend constructor
parameter, i.e. new FsStateBackend("file:///home/buvana/flink/checkpoints",
0) to disable this optimization.

Cheers,
Aljoscha

On Fri, 12 Aug 2016 at 21:12 Ramanan, Buvana (Nokia - US) <
buvana.rama...@nokia-bell-labs.com> wrote:

> Hi Kostas,
>
> I am trying to use FsStateBackend as the backend for storing state. And
> configure it as follows in the code:
>
>StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new
> FsStateBackend("file:///home/buvana/flink/checkpoints"));
> env.enableCheckpointing(1);
>
> everything else is same as the code I shared with you previously.
>
> When I execute, I see that a directory is created under
> /home/buvana/flink/checkpoints, but there is nothing under that directory.
> I was expecting to find some file / sub dir there.
>
> Please explain.
>
> Thanks,
> Buvana
>
> -Original Message-
> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
> Sent: Friday, August 12, 2016 1:37 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
>
> No problem!
>
> Regards,
> Kostas
>
> > On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) <
> buvana.rama...@nokia-bell-labs.com> wrote:
> >
> > Kostas,
> > Good catch! That makes it working! Thank you so much for the help.
> > Regards,
> > Buvana
> >
> > -Original Message-
> > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
> > Sent: Thursday, August 11, 2016 11:22 AM
> > To: user@flink.apache.org
> > Subject: Re: flink - Working with State example
> >
> > Hi Buvana,
> >
> > At a first glance, your snapshotState() should return a Double.
> >
> > Kostas
> >
> >> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <
> buvana.rama...@nokia-bell-labs.com> wrote:
> >>
> >> Thank you Kostas & Ufuk. I get into the following compilation error
> when I use checkpointed interface. Pasting the code & message as follows:
> >>
> >> Is the Serializable definition supposed to be from java.io.Serializable
> or somewhere else?
> >>
> >> Thanks again,
> >> Buvana
> >>
> >> ==
> >> ==
> >> Code:
> >>
> >> import org.apache.flink.api.common.functions.FlatMapFunction;
> >> import org.apache.flink.api.common.functions.MapFunction;
> >> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> >> import org.apache.flink.configuration.Configuration;
> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >> import org.apache.flink.api.common.typeinfo.TypeHint;
> >> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> >>
> >> import java.io.Serializable;
> >> import org.apache.flink.api.java.tuple.Tuple2;
> >> import org.apache.flink.streaming.api.datastream.DataStream;
> >> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> >> import
> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >> import
> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> >> import
> >> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> >> import org.apache.flink.util.Collector;
> >>
> >> import java.util.Properties;
> >>
> >> /**
> >> * Created by buvana on 8/9/16.
> >> */
> >> public class stateful {
> >>   private static String INPUT_KAFKA_TOPIC = null;
> >> ---
> >> --- skipping the main as it’s the same as before except for class name
> >> change -
> >> ---
> >>  public static class MapStateful extends
> RichFlatMapFunction<String, Tuple2<String, Double>>
> >>   implements Checkpointed {
> >>
> >>   private Double prev_tuple = null;
> >>
> >>

RE: flink - Working with State example

2016-08-12 Thread Ramanan, Buvana (Nokia - US)
Hi Kostas,

I am trying to use FsStateBackend as the backend for storing state. And 
configure it as follows in the code:

   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new 
FsStateBackend("file:///home/buvana/flink/checkpoints"));
env.enableCheckpointing(1);

everything else is same as the code I shared with you previously. 

When I execute, I see that a directory is created under 
/home/buvana/flink/checkpoints, but there is nothing under that directory.
I was expecting to find some file / sub dir there.

Please explain.

Thanks,
Buvana

-Original Message-
From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
Sent: Friday, August 12, 2016 1:37 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

No problem!

Regards,
Kostas

> On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) 
> <buvana.rama...@nokia-bell-labs.com> wrote:
> 
> Kostas,
> Good catch! That makes it working! Thank you so much for the help.
> Regards,
> Buvana
> 
> -Original Message-
> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
> Sent: Thursday, August 11, 2016 11:22 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
> 
> Hi Buvana, 
> 
> At a first glance, your snapshotState() should return a Double.
> 
> Kostas
> 
>> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) 
>> <buvana.rama...@nokia-bell-labs.com> wrote:
>> 
>> Thank you Kostas & Ufuk. I get into the following compilation error when I 
>> use checkpointed interface. Pasting the code & message as follows:
>> 
>> Is the Serializable definition supposed to be from java.io.Serializable or 
>> somewhere else?
>> 
>> Thanks again,
>> Buvana
>> 
>> ==
>> ==
>> Code:
>> 
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> 
>> import java.io.Serializable;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>> import 
>> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>> 
>> import java.util.Properties;
>> 
>> /**
>> * Created by buvana on 8/9/16.
>> */
>> public class stateful {
>>   private static String INPUT_KAFKA_TOPIC = null;
>> ---
>> --- skipping the main as it’s the same as before except for class name 
>> change -
>> ---
>>  public static class MapStateful extends RichFlatMapFunction<String, 
>> Tuple2<String, Double>>
>>   implements Checkpointed {
>> 
>>   private Double prev_tuple = null;
>> 
>>   @Override
>>   public void flatMap(String incString, Collector<Tuple2<String, 
>> Double>> out) {
>>   try {
>>   Double value = Double.parseDouble(incString);
>>   System.out.println("value = " + value);
>>   System.out.println(prev_tuple);
>> 
>>   Double value2 = value - prev_tuple;
>>   prev_tuple = value;
>> 
>>   Tuple2<String, Double> tp = new Tuple2<String, Double>();
>>   tp.setField(INPUT_KAFKA_TOPIC, 0);
>>   tp.setField(value2, 1);
>>   out.collect(tp);
>>   } catch (NumberFormatException e) {
>>   System.out.println("Could not convert to Float" + incString);
>>   System.err.println("Could not convert to Float" + incString);
>>   }
>>   }
>>   @Override
>>   public void open(Configuration config) {
>

Re: flink - Working with State example

2016-08-12 Thread Kostas Kloudas
No problem!

Regards,
Kostas

> On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) 
> <buvana.rama...@nokia-bell-labs.com> wrote:
> 
> Kostas,
> Good catch! That makes it working! Thank you so much for the help.
> Regards,
> Buvana
> 
> -Original Message-
> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
> Sent: Thursday, August 11, 2016 11:22 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
> 
> Hi Buvana, 
> 
> At a first glance, your snapshotState() should return a Double.
> 
> Kostas
> 
>> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) 
>> <buvana.rama...@nokia-bell-labs.com> wrote:
>> 
>> Thank you Kostas & Ufuk. I get into the following compilation error when I 
>> use checkpointed interface. Pasting the code & message as follows:
>> 
>> Is the Serializable definition supposed to be from java.io.Serializable or 
>> somewhere else?
>> 
>> Thanks again,
>> Buvana
>> 
>> ==
>> ==
>> Code:
>> 
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> 
>> import java.io.Serializable;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>> import 
>> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>> 
>> import java.util.Properties;
>> 
>> /**
>> * Created by buvana on 8/9/16.
>> */
>> public class stateful {
>>   private static String INPUT_KAFKA_TOPIC = null;
>> ---
>> --- skipping the main as it’s the same as before except for class name 
>> change -
>> ---
>>  public static class MapStateful extends RichFlatMapFunction<String, 
>> Tuple2<String, Double>>
>>   implements Checkpointed {
>> 
>>   private Double prev_tuple = null;
>> 
>>   @Override
>>   public void flatMap(String incString, Collector<Tuple2<String, 
>> Double>> out) {
>>   try {
>>   Double value = Double.parseDouble(incString);
>>   System.out.println("value = " + value);
>>   System.out.println(prev_tuple);
>> 
>>   Double value2 = value - prev_tuple;
>>   prev_tuple = value;
>> 
>>   Tuple2<String, Double> tp = new Tuple2<String, Double>();
>>   tp.setField(INPUT_KAFKA_TOPIC, 0);
>>   tp.setField(value2, 1);
>>   out.collect(tp);
>>   } catch (NumberFormatException e) {
>>   System.out.println("Could not convert to Float" + incString);
>>   System.err.println("Could not convert to Float" + incString);
>>   }
>>   }
>>   @Override
>>   public void open(Configuration config) {
>>   if (prev_tuple == null) {
>>   // only recreate if null
>>   // restoreState will be called before open()
>>   // so this will already set the sum to the restored value
>>   prev_tuple = new Double("0.0");
>>   }
>>   }
>> 
>>   @Override
>>   public Serializable snapshotState(
>>   long checkpointId,
>>   long checkpointTimestamp) throws Exception {
>>   return prev_tuple;
>>   }
>> 
>> 
>>   @Override
>>   public void restoreState(Double state) {
>>   prev_tuple = state;
>>   }
>>   }
>> }
>> ==
>> =
>> ERROR message while building:
>> 
>> $ mvn clean package
>&g

RE: flink - Working with State example

2016-08-11 Thread Ramanan, Buvana (Nokia - US)
Kostas,
Good catch! That makes it working! Thank you so much for the help.
Regards,
Buvana

-Original Message-
From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
Sent: Thursday, August 11, 2016 11:22 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

Hi Buvana, 

At a first glance, your snapshotState() should return a Double.

Kostas

> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) 
> <buvana.rama...@nokia-bell-labs.com> wrote:
> 
> Thank you Kostas & Ufuk. I get into the following compilation error when I 
> use checkpointed interface. Pasting the code & message as follows:
> 
> Is the Serializable definition supposed to be from java.io.Serializable or 
> somewhere else?
> 
> Thanks again,
> Buvana
> 
> ==
> ==
> Code:
> 
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> 
> import java.io.Serializable;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import 
> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
> 
> import java.util.Properties;
> 
> /**
> * Created by buvana on 8/9/16.
> */
> public class stateful {
>private static String INPUT_KAFKA_TOPIC = null;
> ---
> --- skipping the main as it’s the same as before except for class name 
> change -
> ---
>   public static class MapStateful extends RichFlatMapFunction<String, 
> Tuple2<String, Double>>
>implements Checkpointed {
> 
>private Double prev_tuple = null;
> 
>@Override
>public void flatMap(String incString, Collector<Tuple2<String, 
> Double>> out) {
>try {
>Double value = Double.parseDouble(incString);
>System.out.println("value = " + value);
>System.out.println(prev_tuple);
> 
>Double value2 = value - prev_tuple;
>prev_tuple = value;
> 
>Tuple2<String, Double> tp = new Tuple2<String, Double>();
>tp.setField(INPUT_KAFKA_TOPIC, 0);
>tp.setField(value2, 1);
>out.collect(tp);
>} catch (NumberFormatException e) {
>System.out.println("Could not convert to Float" + incString);
>System.err.println("Could not convert to Float" + incString);
>}
>}
>@Override
>public void open(Configuration config) {
>if (prev_tuple == null) {
>// only recreate if null
>// restoreState will be called before open()
>// so this will already set the sum to the restored value
>prev_tuple = new Double("0.0");
>}
>}
> 
>@Override
>public Serializable snapshotState(
>long checkpointId,
>long checkpointTimestamp) throws Exception {
>return prev_tuple;
>}
> 
> 
>@Override
>public void restoreState(Double state) {
>prev_tuple = state;
>}
>}
> }
> ==
> =
> ERROR message while building:
> 
> $ mvn clean package
> [INFO] Scanning for projects...
> [INFO]
>  
> [INFO] 
> --
> -- [INFO] Building Flink Quickstart Job 0.1 [INFO] 
> --
> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has 
> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] --- 
> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO] 
> Deleting /home/buvana/flink/flink

Re: flink - Working with State example

2016-08-11 Thread Kostas Kloudas
 wiki-edits 
> ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 7 source files to 
> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
> [INFO] -
> [ERROR] COMPILATION ERROR : 
> [INFO] -
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19]
>  wikiedits.stateful.MapStateful is not abstract and does not override 
> abstract method snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29]
>  snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement 
> snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
>  return type java.io.Serializable is not compatible with java.lang.Double
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9]
>  method does not override or implement a method from a supertype
> [INFO] 3 errors 
> [INFO] -
> [INFO] 
> 
> [INFO] BUILD FAILURE
> [INFO] 
> 
> [INFO] Total time: 2.171s
> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016
> [INFO] Final Memory: 26M/660M
> [INFO] 
> 
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) 
> on project wiki-edits: Compilation failure: Compilation failure:
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19]
>  wikiedits.stateful.MapStateful is not abstract and does not override 
> abstract method snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29]
>  snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement 
> snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] return type java.io.Serializable is not compatible with 
> java.lang.Double
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9]
>  method does not override or implement a method from a supertype
> [ERROR] -> [Help 1]
> [ERROR] 
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR] 
> [ERROR] For more information about the errors and possible solutions, please 
> read the following articles:
> [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> 
> 
> -Original Message-
> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
> Sent: Thursday, August 11, 2016 10:34 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
> 
> Exactly as Ufuk suggested, if you are not grouping your stream by key, you 
> should use the checkpointed interface.
> 
> The reason I asked before if you are using the keyBy() is because this is the 
> one that implicitly sets the keySerializer and scopes your (keyed) state to a 
> specific key.
> 
> If there is no keying, then keyed state cannot be used and the Checkpointed 
> interface should be used instead. 
> 
> Let us know if you need anything else.
> 
> Kostas
> 
>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote:
>> 
>> This only works for keyed streams, you have to use keyBy().
>> 
>> You can use the Checkpointed interface instead 
>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>> 
>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) 
>> <buvana.rama...@nokia-bell-labs.com> wrote:
>>> Hi Kostas,
>>> 
>>> 
>>> 
>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where 
>>> x[t] is the current value of the incoming sample and x[t-1] is the 
>>> previous value of the incoming sample. I store the current value in 
>>> state store
>>> (‘prev_tuple’) so that I can use it for computation in next cycle. As 
>>> you may observ

RE: flink - Working with State example

2016-08-11 Thread Ramanan, Buvana (Nokia - US)
ng,long) in wikiedits.stateful.MapStateful cannot implement 
snapshotState(long,long) in 
org.apache.flink.streaming.api.checkpoint.Checkpointed
  return type java.io.Serializable is not compatible with java.lang.Double
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9]
 method does not override or implement a method from a supertype
[INFO] 3 errors 
[INFO] -
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 2.171s
[INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016
[INFO] Final Memory: 26M/660M
[INFO] 
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project wiki-edits: Compilation failure: Compilation failure:
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19]
 wikiedits.stateful.MapStateful is not abstract and does not override abstract 
method snapshotState(long,long) in 
org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29]
 snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement 
snapshotState(long,long) in 
org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] return type java.io.Serializable is not compatible with java.lang.Double
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9]
 method does not override or implement a method from a supertype
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException


-Original Message-
From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
Sent: Thursday, August 11, 2016 10:34 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

Exactly as Ufuk suggested, if you are not grouping your stream by key, you 
should use the checkpointed interface.

The reason I asked before if you are using the keyBy() is because this is the 
one that implicitly sets the keySerializer and scopes your (keyed) state to a 
specific key.

If there is no keying, then keyed state cannot be used and the Checkpointed 
interface should be used instead. 

Let us know if you need anything else.

Kostas

> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote:
> 
> This only works for keyed streams, you have to use keyBy().
> 
> You can use the Checkpointed interface instead 
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
> 
> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) 
> <buvana.rama...@nokia-bell-labs.com> wrote:
>> Hi Kostas,
>> 
>> 
>> 
>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where 
>> x[t] is the current value of the incoming sample and x[t-1] is the 
>> previous value of the incoming sample. I store the current value in 
>> state store
>> (‘prev_tuple’) so that I can use it for computation in next cycle. As 
>> you may observe, I am not using keyBy. I am simply printing out the 
>> resultant tuple.
>> 
>> 
>> 
>> It appears from the error message that I have to set the key 
>> serializer (and possibly value serializer) for the state store. I am 
>> not sure how to do that…
>> 
>> 
>> 
>> Thanks for your interest in helping,
>> 
>> 
>> 
>> 
>> 
>> Regards,
>> 
>> Buvana
>> 
>> 
>> 
>> public class stateful {
>> 
>>private static String INPUT_KAFKA_TOPIC = null;
>> 
>>private static int TIME_WINDOW = 0;
>> 
>> 
>> 
>>public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>>if (args.length < 2) {
>> 
>>throw new IllegalArgumentException("The application needs 
>> two arguments. The first is the name of the kafka topic from which it has to 
>> \n"
>> 
>>+ "fetch the data. The second argument is the size 
>> of the window, in seconds, to which the aggregation function must 

Re: flink - Working with State example

2016-08-11 Thread Kostas Kloudas
> 
>>Double value2 = value - prev_stored_tp.f1;
>> 
>>prev_stored_tp.f1 = value;
>> 
>>prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>> 
>>prev_tuple.update(prev_stored_tp);
>> 
>> 
>> 
>>Tuple2<String, Double> tp = new Tuple2<String, Double>();
>> 
>>tp.setField(INPUT_KAFKA_TOPIC, 0);
>> 
>>tp.setField(value2, 1);
>> 
>>out.collect(tp);
>> 
>> 
>> 
>>} catch (NumberFormatException e) {
>> 
>>System.out.println("Could not convert to Float" +
>> incString);
>> 
>>System.err.println("Could not convert to Float" +
>> incString);
>> 
>>}
>> 
>>}
>> 
>> 
>> 
>>@Override
>> 
>>public void open(Configuration config) {
>> 
>>ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>> 
>>new ValueStateDescriptor<>(
>> 
>>"previous input value", // the state name
>> 
>>TypeInformation.of(new TypeHint<Tuple2<String,
>> Double>>() {}), // type information
>> 
>>Tuple2.of("test topic", 0.0)); // default value
>> of the state, if nothing was set
>> 
>>prev_tuple = getRuntimeContext().getState(descriptor);
>> 
>>}
>> 
>>}
>> 
>> }
>> 
>> 
>> 
>> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
>> Sent: Thursday, August 11, 2016 5:45 AM
>> To: user@flink.apache.org
>> Subject: Re: flink - Working with State example
>> 
>> 
>> 
>> Hello Buvana,
>> 
>> 
>> 
>> Can you share a bit more details on your operator and how you are using it?
>> 
>> For example, are you using keyBy before using you custom operator?
>> 
>> 
>> 
>> Thanks a lot,
>> 
>> Kostas
>> 
>> 
>> 
>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>> <buvana.rama...@nokia-bell-labs.com> wrote:
>> 
>> 
>> 
>> Hello,
>> 
>> 
>> 
>> I am utilizing the code snippet in:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
>> and particularly ‘open’ function in my code:
>> 
>> @Override
>> 
>>public void open(Configuration config) {
>> 
>>ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>> 
>>new ValueStateDescriptor<>(
>> 
>>"average", // the state name
>> 
>>TypeInformation.of(new TypeHint<Tuple2<Long,
>> Long>>() {}), // type information
>> 
>>Tuple2.of(0L, 0L)); // default value of the state,
>> if nothing was set
>> 
>>sum = getRuntimeContext().getState(descriptor);
>> 
>>}
>> 
>> 
>> 
>> When I run, I get the following error:
>> 
>> Caused by: java.lang.RuntimeException: Error while getting state
>> 
>>   at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
>> 
>>   at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>> 
>>   at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>> 
>>   at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>> 
>>   at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
>> 
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
>> 
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
>> 
>>   at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> 
>>   at java.lang.Thread.run(Thread.java:745)
>> 
>> Caused by: java.lang.Exception: State key serializer has not been configured
>> in the config. This operation cannot use partitioned state.
>> 
>>   at
>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>> 
>>   at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>> 
>>   at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
>> 
>>   ... 8 more
>> 
>> 
>> 
>> Where do I define the key & value serializer for state?
>> 
>> 
>> 
>> Thanks,
>> 
>> Buvana
>> 
>> 



Re: flink - Working with State example

2016-08-11 Thread Ufuk Celebi
This only works for keyed streams, you have to use keyBy().

You can use the Checkpointed interface instead
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).

On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
<buvana.rama...@nokia-bell-labs.com> wrote:
> Hi Kostas,
>
>
>
> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t]
> is the current value of the incoming sample and x[t-1] is the previous value
> of the incoming sample. I store the current value in state store
> (‘prev_tuple’) so that I can use it for computation in next cycle. As you
> may observe, I am not using keyBy. I am simply printing out the resultant
> tuple.
>
>
>
> It appears from the error message that I have to set the key serializer (and
> possibly value serializer) for the state store. I am not sure how to do
> that…
>
>
>
> Thanks for your interest in helping,
>
>
>
>
>
> Regards,
>
> Buvana
>
>
>
> public class stateful {
>
> private static String INPUT_KAFKA_TOPIC = null;
>
> private static int TIME_WINDOW = 0;
>
>
>
> public static void main(String[] args) throws Exception {
>
>
>
> if (args.length < 2) {
>
> throw new IllegalArgumentException("The application needs two
> arguments. The first is the name of the kafka topic from which it has to \n"
>
> + "fetch the data. The second argument is the size of
> the window, in seconds, to which the aggregation function must be applied.
> \n");
>
> }
>
>
>
> INPUT_KAFKA_TOPIC = args[0];
>
> TIME_WINDOW = Integer.parseInt(args[1]);
>
>
>
> Properties properties = null;
>
>
>
> properties = new Properties();
>
> properties.setProperty("bootstrap.servers", "localhost:9092");
>
> properties.setProperty("zookeeper.connect", "localhost:2181");
>
> properties.setProperty("group.id", "test");
>
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> //env.setStateBackend(new
> FsStateBackend("file://home/buvana/flink/checkpoints"));
>
>
>
> DataStreamSource stream = env
>
> .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new
> SimpleStringSchema(), properties));
>
>
>
> // maps the data into Flink tuples
>
> DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new
> Rec2Tuple2());
>
>
>
> // write the result to the console or in a Kafka topic
>
> streamTuples.print();
>
>
>
> env.execute("plus one");
>
>
>
> }
>
>
>
> public static class Rec2Tuple2 extends RichFlatMapFunction<String,
> Tuple2<String,Double> > {
>
> private transient ValueState<Tuple2<String, Double>> prev_tuple;
>
>
>
> @Override
>
> public void flatMap(String incString, Collector<Tuple2<String,
> Double>> out) throws Exception {
>
> try {
>
> Double value = Double.parseDouble(incString);
>
> System.out.println("value = " + value);
>
> Tuple2<String, Double> prev_stored_tp = prev_tuple.value();
>
> System.out.println(prev_stored_tp);
>
>
>
> Double value2 = value - prev_stored_tp.f1;
>
> prev_stored_tp.f1 = value;
>
> prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>
> prev_tuple.update(prev_stored_tp);
>
>
>
> Tuple2<String, Double> tp = new Tuple2<String, Double>();
>
> tp.setField(INPUT_KAFKA_TOPIC, 0);
>
> tp.setField(value2, 1);
>
> out.collect(tp);
>
>
>
> } catch (NumberFormatException e) {
>
> System.out.println("Could not convert to Float" +
> incString);
>
> System.err.println("Could not convert to Float" +
> incString);
>
> }
>
> }
>
>
>
> @Override
>
> public void open(Configuration config) {
>
> ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>
> new ValueStateDescriptor<>(
>
> "previous input value", // the state name
>
>

RE: flink - Working with State example

2016-08-11 Thread Ramanan, Buvana (Nokia - US)
Hi Kostas,

Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t] is 
the current value of the incoming sample and x[t-1] is the previous value of 
the incoming sample. I store the current value in state store (‘prev_tuple’) so 
that I can use it for computation in next cycle. As you may observe, I am not 
using keyBy. I am simply printing out the resultant tuple.

It appears from the error message that I have to set the key serializer (and 
possibly value serializer) for the state store. I am not sure how to do that…

Thanks for your interest in helping,


Regards,
Buvana

public class stateful {
private static String INPUT_KAFKA_TOPIC = null;
private static int TIME_WINDOW = 0;

public static void main(String[] args) throws Exception {

if (args.length < 2) {
throw new IllegalArgumentException("The application needs two 
arguments. The first is the name of the kafka topic from which it has to \n"
+ "fetch the data. The second argument is the size of the 
window, in seconds, to which the aggregation function must be applied. \n");
}

INPUT_KAFKA_TOPIC = args[0];
TIME_WINDOW = Integer.parseInt(args[1]);

Properties properties = null;

properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStateBackend(new 
FsStateBackend("file://home/buvana/flink/checkpoints"));

DataStreamSource stream = env
.addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new 
SimpleStringSchema(), properties));

// maps the data into Flink tuples
DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new 
Rec2Tuple2());

// write the result to the console or in a Kafka topic
streamTuples.print();

env.execute("plus one");

}

public static class Rec2Tuple2 extends RichFlatMapFunction<String, 
Tuple2<String,Double> > {
private transient ValueState<Tuple2<String, Double>> prev_tuple;

@Override
public void flatMap(String incString, Collector<Tuple2<String, Double>> 
out) throws Exception {
try {
Double value = Double.parseDouble(incString);
System.out.println("value = " + value);
Tuple2<String, Double> prev_stored_tp = prev_tuple.value();
System.out.println(prev_stored_tp);

Double value2 = value - prev_stored_tp.f1;
prev_stored_tp.f1 = value;
prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
prev_tuple.update(prev_stored_tp);

Tuple2<String, Double> tp = new Tuple2<String, Double>();
tp.setField(INPUT_KAFKA_TOPIC, 0);
tp.setField(value2, 1);
out.collect(tp);

} catch (NumberFormatException e) {
System.out.println("Could not convert to Float" + incString);
System.err.println("Could not convert to Float" + incString);
}
}

@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<String, Double>> descriptor =
new ValueStateDescriptor<>(
"previous input value", // the state name
TypeInformation.of(new TypeHint<Tuple2<String, 
Double>>() {}), // type information
Tuple2.of("test topic", 0.0)); // default value of 
the state, if nothing was set
    prev_tuple = getRuntimeContext().getState(descriptor);
}
}
}

From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
Sent: Thursday, August 11, 2016 5:45 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

Hello Buvana,

Can you share a bit more details on your operator and how you are using it?
For example, are you using keyBy before using you custom operator?

Thanks a lot,
Kostas

On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) 
<buvana.rama...@nokia-bell-labs.com<mailto:buvana.rama...@nokia-bell-labs.com>> 
wrote:

Hello,

I am utilizing the code snippet in: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
 and particularly ‘open’ function in my code:
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
 

flink - Working with State example

2016-08-10 Thread Ramanan, Buvana (Nokia - US)
Hello,

I am utilizing the code snippet in: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
 and particularly ‘open’ function in my code:
@Override
public void open(Configuration config) {
ValueStateDescriptor> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint>() 
{}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if 
nothing was set
sum = getRuntimeContext().getState(descriptor);
}

When I run, I get the following error:
Caused by: java.lang.RuntimeException: Error while getting state
   at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
   at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
   at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
   at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
   at 
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
   at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: State key serializer has not been configured in 
the config. This operation cannot use partitioned state.
   at 
org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
   at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
   at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
   ... 8 more

Where do I define the key & value serializer for state?

Thanks,
Buvana