Re: Kinesis connector classpath issue when running Flink 1.1-SNAPSHOT on YARN

2016-06-23 Thread Josh
Hi Aljoscha,
I opened an issue here https://issues.apache.org/jira/browse/FLINK-4115 and
submitted a pull request.
I'm not sure if my fix is the best way to resolve this, or if it's better
to just remove the verification checks completely.

Thanks,
Josh

On Thu, Jun 23, 2016 at 9:41 AM, Aljoscha Krettek 
wrote:

> Hi Josh,
> do you maybe want to open an issue for that and contribute your fix for
> that?
>
> Cheers,
> Aljoscha
>
> On Fri, 17 Jun 2016 at 17:49 Josh  wrote:
>
>> Hi Aljoscha,
>>
>> Thanks! It looks like you're right. I've ran it with the FsStateBackend
>> and everything works fine.
>>
>> I've also got it working with RocksDBStateBackend now, by rebuilding
>> Flink master with:
>> - the verify step in FsStateBackend skipped for URIs with s3 schemes.
>> - the initialisation of filesystem in the constructor commented out (not
>> sure why this is initialised in the constructor, since it seems to get
>> initialised later anyway)
>>
>> Josh
>>
>>
>>
>> On Fri, Jun 17, 2016 at 2:53 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> I think the problem with the missing Class
>>> com.amazon.ws.emr.hadoop.fs.EmrFileSystem is not specific to RocksDB. The
>>> exception is thrown in the FsStateBackend, which is internally used by the
>>> RocksDB backend to do snapshotting of non-partitioned state. The problem is
>>> that the FsStateBackend tries to verify that the checkpoint path exists in
>>> the constructor. The constructor is invoked in the client program, when not
>>> running in the Yarn context where the correct jars that hold the EMR
>>> FileSystem classes are available. This should be causing the exception.
>>>
>>> Just to verify, could you maybe run it with the FsStateBackend to see if
>>> you get the same exception. If yes, then we need to remove the verify step
>>> in the FsStateBackend or at least provide a way to bypass these steps.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 17 Jun 2016 at 15:40 Josh  wrote:
>>>
 I found that I can still write to s3, using my Flink build of
 1.1-SNAPSHOT, for example if I run the word count example:
 ./bin/flink run ./examples/batch/WordCount.jar --input
 hdfs:///tmp/LICENSE --output s3://permutive-flink/wordcount-result.txt

 This works fine - it's just the RocksDBStateBackend which is erroring
 with the s3 URI. I'm wondering if it could be an issue with
 RocksDBStateBackend?


 On Fri, Jun 17, 2016 at 12:09 PM, Josh  wrote:

> Hi Gordon/Fabian,
>
> Thanks for helping with this! Downgrading the Maven version I was
> using to build Flink appears to have fixed that problem - I was using 
> Maven
> 3.3.3 before and have downgraded to 3.2.5.
>
> Just for reference, I printed the loaded class at runtime and found
> that when I was using Flink built with Maven 3.3.3, it was pulling in:
>
> jar:file:/opt/flink/flink-1.1-SNAPSHOT/lib/flink-dist_2.11-1.1-SNAPSHOT.jar!/org/apache/http/params/HttpConnectionParams.class
> But after building with the older Maven version, it pulled in the
> class from my jar:
>
> jar:file:/tmp/my-assembly-1.0.jar!/org/apache/http/params/HttpConnectionParams.class
>
>
> Unfortunately now that problem is fixed I've now got a different
> classpath issue. It started with:
>
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
> at
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205)
>
> This is strange because I used an s3:// checkpoint directory when
> running Flink 1.0.3 on EMR and it worked fine. (according to
> https://ci.apache.org/projects/flink/flink-docs-master/setup/aws.html#provide-s3-filesystem-dependency
> no configuration should be needed to use S3 when running on EMR).
>
> Anyway I tried executing /etc/hadoop/conf/hadoop-env.sh before running
> my job, as this sets up the HADOOP_CLASSPATH env var. The exception then
> changed to:
> java.lang.NoClassDefFoundError: org/apache/hadoop/fs/common/Abortable
>
> I found that this class is related 

Question regarding logging capabilities in flink

2016-06-23 Thread Sharma, Samiksha
Hi,

I was reading this link regarding logging in flink jobs 
(https://ci.apache.org/projects/flink/flink-docs-master/internals/logging.html) 
and with modifications to log4j files I am able to see logs in flink/log 
directory when I run job Standalone or on Yarn, but I was more interested in 
seeing the logs I added in my job when I see logs on Yarn UI.
Is there a way in Flink that can enable the user to see the logs added in a job 
on Yarn rather than on client side.?

Thanks
Samiksha




Re: Localhost terminated

2016-06-23 Thread Till Rohrmann
What do the log files say?

Cheers,
Till

On Thu, Jun 23, 2016 at 4:46 PM, Debaditya Roy  wrote:

> Hello users,
>
> I have a problem with flink execution from cli. Each time I deploy from
> the CLI, the job starts but then it terminates the localhost.
>
>
>
> *06/23/2016 16:42:18Source: Custom Source -> Flat Map -> Sink:
> Unnamed(1/1) switched to SCHEDULED 06/23/2016 16:42:19Source: Custom
> Source -> Flat Map -> Sink: Unnamed(1/1) switched to DEPLOYING 06/23/2016
> 16:42:19Source: Custom Source -> Flat Map -> Sink: Unnamed(1/1)
> switched to RUNNING *
>
> and I have to abort the job (ctrl + c).
> Immediately after deployment when I check I see that the localhost has
> stopped. Looking forward for inputs. Thanks in advance.
>
> Warm Regards,
> Debaditya
>


Re: Iterate several kafka topics using the kafka connector

2016-06-23 Thread Sendoh
Thank you. It totally works as what we want which unions data streams.

Best,

Sendoh



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterate-several-kafka-topics-using-the-kafka-connector-tp7673p7680.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Localhost terminated

2016-06-23 Thread Debaditya Roy
Hello users,

I have a problem with flink execution from cli. Each time I deploy from the
CLI, the job starts but then it terminates the localhost.



*06/23/2016 16:42:18Source: Custom Source -> Flat Map -> Sink:
Unnamed(1/1) switched to SCHEDULED 06/23/2016 16:42:19Source: Custom
Source -> Flat Map -> Sink: Unnamed(1/1) switched to DEPLOYING 06/23/2016
16:42:19Source: Custom Source -> Flat Map -> Sink: Unnamed(1/1)
switched to RUNNING *

and I have to abort the job (ctrl + c).
Immediately after deployment when I check I see that the localhost has
stopped. Looking forward for inputs. Thanks in advance.

Warm Regards,
Debaditya


Scala/ReactiveMongo: type classes, macros and java.util.Serializable

2016-06-23 Thread Stefano Baghino
Hello everybody,

in the past days, I've written batch input/output formats for MongoDB.

Initially, I've tried to use the non-blocking ReactiveMongo
 driver, which uses the type class pattern in
Scala for the serialization logic. The library also exposes some pretty
neat macros that automatically generate the type class instances for you,
given a case class.

Now, the problem is that these macros are not useful in Flink because the
generated type class instances would have to be serializable (something
that has been understandably left out from the macros). Has anyone ever
faced a similar problem? I've encountered it again when using upickle
, which has a similar
facility but for JSON serialization.

In the end I've resorted to writing my own serialization logic and
explicitly extending java.util.Serializable in the end but I feel there may
be a way to not do this (without rewriting/extending the macros to make the
generated classes serializable).

-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Stoppable Job And Web UI Questions

2016-06-23 Thread Till Rohrmann
Hi Yan Chou Chen,

   1.

   At the moment Flink sources have to implement a certain interface,
   StoppableFunction, to be stoppable. If they do, then you can stop them
   via the CLI or the web interface. This cannot be triggered from within a
   job.

   However, you have a far better way to properly terminate a Flink job
   with your custom sources. Simply terminate the SourceFunction (leaving
   the read loop) once you’ve detected that you’ve met your termination
   criterion. Once all sources have done that, the job will be properly
   terminated and go into the state FINISHED. That has the advantage that
   you reach a consensus when to terminate. Otherwise there might be a
   dictator which orders the other tasks to stop even though they might still
   have some work left to do.
   2.

   The number of records sent is the sum of all records sent by this task.
   These records include the watermarks as well as the actual stream records
   containing your data (read from Kafka). As such, this number will always be
   an upper bound for the number of actually read records (e.g. from Kafka) by
   your source.
   3.

   Given that also others might deliver messages to the same Kafka topic
   and that you have multiple partitions, I think it is not easy to know when
   your 1000 messages have been processed.

If you’re the only one who writes to this Kafka topic, you can use an
accumulator to count the number of messages sent. The accumulator is live
updated in the web ui’s tasks overview (if you click on the job and then
the tab accumulators).

input.map(new RichMapFunction() {
IntCounter intCounter = null;

@Override
public void open(Configuration config) {
intCounter = this.getRuntimeContext().getIntCounter("messages");
}
@Override
public Integer map(Integer integer) throws Exception {
intCounter.add(1);
return integer;
}
})

Cheers,
Till
​

On Wed, Jun 22, 2016 at 4:39 PM, Yan Chou Chen  wrote:

> Several new questions:
> - Stoppable job
> I read threads mentioning that a streaming job can be stopped [1][2].
> However looks like it can only be called through command line. Is it
> possible to programmatically stop the streaming job from within the
> job itself? For instance, a Kafka consumer streaming job reaches
> predefined condition, then call stop() from within e.g. MapFunction?
>
> - Web UI (jobmanager-host:8081) information
> I have a Kafka consumer which reads records from Kafka. In web ui's
> Subtasks tab where it has "Records sent", does it imply the records
> read by consumer? For instance, I deliver say 1k string record
> (SimpleStringSchema) to Kafka; can I expect 1k "Records sent"
> displayed on web ui once all those records read by consumer?
>
> This leads to another question. I have a streaming job which exploits
> map function e.g. stream.map(new MyMapFunction). Within the
> MyMapFunction impl I count per input and write the count to external
> places. Later on I sum the count value for MyMapFunction based on
> Parallelism supplied. So for example I run map(MyMapFunction) with 4
> parallelism, MyMapFunction processes 400, 500, 400, 500 count
> respectively. Later on the sum of all count is 1800. However this sum
> value is different from web ui which has higher "Record sent" e.g. 8k.
> Does that mean "Records sent" in web ui does not mean the records
> processed by MyMapFunction? How do I interpret the value in this
> column or how can I know if all messages delivered to Kafka are fully
> processed i.e. 1k records delivered to Kafka and 1k records read out
> of Kafka?
>
> Thanks.
>
> [1].
> http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3c57155c30.8010...@apache.org%3E
>
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
>


Re: Iterate several kafka topics using the kafka connector

2016-06-23 Thread Till Rohrmann
It is possible to instantiate the FlinkKafkaConsumer with multiple topics
[1]. Simply pass a list of topic names instead of a the name of a single
topic.

streams.add(env.addSource(new
FlinkKafkaConsumer09<>(Arrays.asList("foo", "bar", "foobar"),
new JSONSchema(), properties));

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumer

Cheers,
Till
​

On Thu, Jun 23, 2016 at 2:33 PM, Sendoh  wrote:

> Hi Flink developers,
>
> Can I ask how could we iterate several Kafka topics using the Kafka
> connector?
>
> Our idea is like the following example:
>
> List> streams = new ArrayList<>();
>
> // Iterate kafka topics
> Iterator topicIter = topicList.iterator();
>
> while (topicIter.hasNext()){
>
> String topic = topicIter.next();
>
> streams.add(env.addSource(new FlinkKafkaConsumer09<>(topic,
> new JSONSchema(), properties)).rebalance());
>
> }
>
> Our goal is to union several kafka data streams into one, given the topics
> as a list:
>
> Iterator> streamsIt = streams.iterator();
>
> DataStream currentStream = streamsIt.next();
> while(streamsIt.hasNext()){
> DataStream nextStream = streamsIt.next();
> currentStream = currentStream.union(nextStream);
> }
>
> Cheers,
>
> Sendoh
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterate-several-kafka-topics-using-the-kafka-connector-tp7673.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Initialization of static variables

2016-06-23 Thread Flavio Pompermaier
Ok, thanks for the explanation Till!

On Thu, Jun 23, 2016 at 3:19 PM, Till Rohrmann  wrote:

> Yes this is normal Flink behaviour. The reason is that static variables
> are not transferred to the cluster. What happens instead when you first
> load the class on the cluster is that the static variables are created and
> possible class initializer are executed. That is also the reason why your
> second example works whereas the first fails.
>
> Cheers,
> Till
>
> On Thu, Jun 23, 2016 at 3:12 PM, Flavio Pompermaier 
> wrote:
>
>> Hi all,
>> I've a Flink job that initialize a static Map in the main program, before
>> starting any Flink transformation. If I run the job locally that variable
>> is not empty, running the job on the cluster reset that variable..is it a
>> bug or am I doing something wrong?
>> It only works if I initialize that variable in a static statement before
>> the main, that is:
>>
>> / KO EXAMPLE
>> class ErrorMain {
>>
>> private static final Map ht = new HashMap<>();
>>
>> publis static final main(String[]args){
>>  ht.put("test","test");
>>  env.readFile().map(
>> ...
>>//here ht.get("test") returns null
>>   }
>> }
>>
>> / OK EXAMPLE
>> class OkMain {
>>
>> private static final Map ht = new HashMap<>();
>> static{
>> ht.put("test","test");
>> }
>> publis static final main(String[] args){
>>
>>  env.readFile().map(
>> ...
>>//here ht.get("test") works
>>   }
>> }
>>
>>
>> Best,
>> Flavio
>>
>>
>


Re: Initialization of static variables

2016-06-23 Thread Till Rohrmann
Yes this is normal Flink behaviour. The reason is that static variables are
not transferred to the cluster. What happens instead when you first load
the class on the cluster is that the static variables are created and
possible class initializer are executed. That is also the reason why your
second example works whereas the first fails.

Cheers,
Till

On Thu, Jun 23, 2016 at 3:12 PM, Flavio Pompermaier 
wrote:

> Hi all,
> I've a Flink job that initialize a static Map in the main program, before
> starting any Flink transformation. If I run the job locally that variable
> is not empty, running the job on the cluster reset that variable..is it a
> bug or am I doing something wrong?
> It only works if I initialize that variable in a static statement before
> the main, that is:
>
> / KO EXAMPLE
> class ErrorMain {
>
> private static final Map ht = new HashMap<>();
>
> publis static final main(String[]args){
>  ht.put("test","test");
>  env.readFile().map(
> ...
>//here ht.get("test") returns null
>   }
> }
>
> / OK EXAMPLE
> class OkMain {
>
> private static final Map ht = new HashMap<>();
> static{
> ht.put("test","test");
> }
> publis static final main(String[] args){
>
>  env.readFile().map(
> ...
>//here ht.get("test") works
>   }
> }
>
>
> Best,
> Flavio
>
>


Iterate several kafka topics using the kafka connector

2016-06-23 Thread Sendoh
Hi Flink developers,

Can I ask how could we iterate several Kafka topics using the Kafka
connector?

Our idea is like the following example:

List> streams = new ArrayList<>();

// Iterate kafka topics
Iterator topicIter = topicList.iterator();

while (topicIter.hasNext()){

String topic = topicIter.next();

streams.add(env.addSource(new FlinkKafkaConsumer09<>(topic,
new JSONSchema(), properties)).rebalance());

}

Our goal is to union several kafka data streams into one, given the topics
as a list:

Iterator> streamsIt = streams.iterator();

DataStream currentStream = streamsIt.next();
while(streamsIt.hasNext()){
DataStream nextStream = streamsIt.next();
currentStream = currentStream.union(nextStream);
}

Cheers,

Sendoh



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterate-several-kafka-topics-using-the-kafka-connector-tp7673.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Initialization of static variables

2016-06-23 Thread Flavio Pompermaier
Hi all,
I've a Flink job that initialize a static Map in the main program, before
starting any Flink transformation. If I run the job locally that variable
is not empty, running the job on the cluster reset that variable..is it a
bug or am I doing something wrong?
It only works if I initialize that variable in a static statement before
the main, that is:

/ KO EXAMPLE
class ErrorMain {

private static final Map ht = new HashMap<>();

publis static final main(String[]args){
 ht.put("test","test");
 env.readFile().map(
...
   //here ht.get("test") returns null
  }
}

/ OK EXAMPLE
class OkMain {

private static final Map ht = new HashMap<>();
static{
ht.put("test","test");
}
publis static final main(String[] args){

 env.readFile().map(
...
   //here ht.get("test") works
  }
}


Best,
Flavio


Re: State key serializer has not been configured in the config.

2016-06-23 Thread Chesnay Schepler

We should adjust the error message to contain the keyed stream thingy.

On 23.06.2016 10:11, Till Rohrmann wrote:

Hi Jacob,

the `ListState` abstraction is a state which we call 
partitioned/key-value state. As such, it is only possible to use it 
with a keyed stream. This means that you have to call `keyBy` after 
the `connect` API call.


Cheers,
Till

On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen > wrote:


Hi,

I am trying to use a ListState in a RichCoFlatMapFunction but when
calling: getRuntimeContext().getListState(descriptor) in the
open-function i am getting a "State key serializer has not .."
exception. I am not sure what i can do to avoid this exception -
Are any of you able to provide some help ?

Best regards
Jacob


 private ListState> deltaPositions;

@Override
public void open(org.apache.flink.configuration.Configuration
parameters) throws Exception {
  // Create state variable
  ListStateDescriptor> descriptor =
  new ListStateDescriptor<>(
  "deltaPositions", // the state name
  TypeInformation.of(new
TypeHint>() {
  }));

  deltaPositions = getRuntimeContext().getListState(descriptor);
};



2016-06-22 20:41:38,813 INFO
 org.apache.flink.runtime.taskmanager.Task   - Stream of Items
with collection of meadian times (1/1) switched to FAILED with
exception.
java.lang.RuntimeException: Error while getting state
at

org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
at

crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
at

org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at

org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at

org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: State key serializer has not been
configured in the config. This operation cannot use partitioned state.
at

org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
at

org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
at

org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
... 8 more
2016-06-22 20:41:38,815 INFO
 org.apache.flink.runtime.taskmanager.Task   - Freeing ta

-- 
Jacob Bay Larsen


Phone: +45 6133 1108 
E-mail: m...@jacobbay.dk 






Re: Kinesis connector classpath issue when running Flink 1.1-SNAPSHOT on YARN

2016-06-23 Thread Aljoscha Krettek
Hi Josh,
do you maybe want to open an issue for that and contribute your fix for
that?

Cheers,
Aljoscha

On Fri, 17 Jun 2016 at 17:49 Josh  wrote:

> Hi Aljoscha,
>
> Thanks! It looks like you're right. I've ran it with the FsStateBackend
> and everything works fine.
>
> I've also got it working with RocksDBStateBackend now, by rebuilding Flink
> master with:
> - the verify step in FsStateBackend skipped for URIs with s3 schemes.
> - the initialisation of filesystem in the constructor commented out (not
> sure why this is initialised in the constructor, since it seems to get
> initialised later anyway)
>
> Josh
>
>
>
> On Fri, Jun 17, 2016 at 2:53 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> I think the problem with the missing Class
>> com.amazon.ws.emr.hadoop.fs.EmrFileSystem is not specific to RocksDB. The
>> exception is thrown in the FsStateBackend, which is internally used by the
>> RocksDB backend to do snapshotting of non-partitioned state. The problem is
>> that the FsStateBackend tries to verify that the checkpoint path exists in
>> the constructor. The constructor is invoked in the client program, when not
>> running in the Yarn context where the correct jars that hold the EMR
>> FileSystem classes are available. This should be causing the exception.
>>
>> Just to verify, could you maybe run it with the FsStateBackend to see if
>> you get the same exception. If yes, then we need to remove the verify step
>> in the FsStateBackend or at least provide a way to bypass these steps.
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 17 Jun 2016 at 15:40 Josh  wrote:
>>
>>> I found that I can still write to s3, using my Flink build of
>>> 1.1-SNAPSHOT, for example if I run the word count example:
>>> ./bin/flink run ./examples/batch/WordCount.jar --input
>>> hdfs:///tmp/LICENSE --output s3://permutive-flink/wordcount-result.txt
>>>
>>> This works fine - it's just the RocksDBStateBackend which is erroring
>>> with the s3 URI. I'm wondering if it could be an issue with
>>> RocksDBStateBackend?
>>>
>>>
>>> On Fri, Jun 17, 2016 at 12:09 PM, Josh  wrote:
>>>
 Hi Gordon/Fabian,

 Thanks for helping with this! Downgrading the Maven version I was using
 to build Flink appears to have fixed that problem - I was using Maven 3.3.3
 before and have downgraded to 3.2.5.

 Just for reference, I printed the loaded class at runtime and found
 that when I was using Flink built with Maven 3.3.3, it was pulling in:

 jar:file:/opt/flink/flink-1.1-SNAPSHOT/lib/flink-dist_2.11-1.1-SNAPSHOT.jar!/org/apache/http/params/HttpConnectionParams.class
 But after building with the older Maven version, it pulled in the class
 from my jar:

 jar:file:/tmp/my-assembly-1.0.jar!/org/apache/http/params/HttpConnectionParams.class


 Unfortunately now that problem is fixed I've now got a different
 classpath issue. It started with:

 java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
 com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
 at
 org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
 at
 org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
 at
 org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
 at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
 at
 org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383)
 at
 org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175)
 at
 org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144)
 at
 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205)

 This is strange because I used an s3:// checkpoint directory when
 running Flink 1.0.3 on EMR and it worked fine. (according to
 https://ci.apache.org/projects/flink/flink-docs-master/setup/aws.html#provide-s3-filesystem-dependency
 no configuration should be needed to use S3 when running on EMR).

 Anyway I tried executing /etc/hadoop/conf/hadoop-env.sh before running
 my job, as this sets up the HADOOP_CLASSPATH env var. The exception then
 changed to:
 java.lang.NoClassDefFoundError: org/apache/hadoop/fs/common/Abortable

 I found that this class is related to a jar called s3-dist-cp, so then
 I tried copying that jar to Flink's lib directory from
 /usr/share/aws/emr/s3-dist-cp/lib/*

 And now I'm back to another Kinesis connector classpath error:

 java.lang.NoClassDefFoundError:
 org/apache/http/conn/ssl/SSLSocketFactory
 at
 com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:136)
 at
 

Re: State key serializer has not been configured in the config.

2016-06-23 Thread Till Rohrmann
Hi Jacob,

the `ListState` abstraction is a state which we call partitioned/key-value
state. As such, it is only possible to use it with a keyed stream. This
means that you have to call `keyBy` after the `connect` API call.

Cheers,
Till

On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen  wrote:

> Hi,
>
> I am trying to use a ListState in a RichCoFlatMapFunction but when
> calling: getRuntimeContext().getListState(descriptor) in the open-function
> i am getting a "State key serializer has not .." exception. I am not sure
> what i can do to avoid this exception - Are any of you able to provide some
> help ?
>
> Best regards
> Jacob
>
>
>  private ListState> deltaPositions;
>
> @Override
> public void open(org.apache.flink.configuration.Configuration
> parameters) throws Exception {
>   // Create state variable
>   ListStateDescriptor> descriptor =
>   new ListStateDescriptor<>(
>   "deltaPositions", // the state name
>   TypeInformation.of(new TypeHint Integer>>() {
>   }));
>
>   deltaPositions = getRuntimeContext().getListState(descriptor);
> };
>
>
>
> 2016-06-22 20:41:38,813 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Stream of Items with collection of meadian times (1/1)
> switched to FAILED with exception.
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
> at
> crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: State key serializer has not been
> configured in the config. This operation cannot use partitioned state.
> at
> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
> ... 8 more
> 2016-06-22 20:41:38,815 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Freeing ta
>
> --
> Jacob Bay Larsen
>
> Phone: +45 6133 1108
> E-mail: m...@jacobbay.dk
>