Re: RocksDB state checkpointing is expensive?

2016-04-07 Thread Krzysztof Zarzycki
OK, Thanks Aljoscha for the info!
Guys, great work on Flink, I really love it :)

Cheers,
Krzysztof

czw., 7.04.2016 o 10:48 użytkownik Aljoscha Krettek 
napisał:

> Hi,
> you are right. Currently there is no incremental checkpointing and
> therefore, at each checkpoint, we essentially copy the whole RocksDB
> database to HDFS (or whatever filesystem you chose as a backup location).
> As far as I know, Stephan will start working on adding support for
> incremental snapshots this week or next week.
>
> Cheers,
> Aljoscha
>
> On Thu, 7 Apr 2016 at 09:55 Krzysztof Zarzycki 
> wrote:
>
>> Hi,
>> I saw the documentation and source code of the state management with
>> RocksDB and before I use it, I'm concerned of one thing: Am I right that
>> currently when state is being checkpointed, the whole RocksDB state is
>> snapshotted? There is no incremental, diff snapshotting, is it? If so, this
>> seems to be unfeasible for keeping state counted in tens or hundreds of GBs
>> (and you reach that size of a state, when you want to keep an embedded
>> state of the streaming application instead of going out to Cassandra/Hbase
>> or other DB). It will just cost too much to do snapshots of such large
>> state.
>>
>> Samza as a good example to compare, writes every state change to Kafka
>> topic, considering it a snapshot in the shape of changelog. Of course in
>> the moment of app restart, recovering the state from the changelog would be
>> too costly, that is why the changelog topic is compacted. Plus, I think
>> Samza does a state snapshot from time to time anyway (but I'm not sure of
>> that).
>>
>> Thanks for answering my doubts,
>> Krzysztof
>>
>>


Re: FromIteratorFunction problems

2016-04-07 Thread Chesnay Schepler

hmm, maybe i was to quick with linking to the JIRA.

As for an example: you can look at the streaming WindowJoin example. The 
sample data uses an Iterator. (ThrottledIterator)
Note that the iterator implementation used is part of flink and also 
implements serializable.


On 07.04.2016 22:18, Andrew Whitaker wrote:

Hi,

I'm trying to get a simple example of a source backed by an iterator 
working. Here's the code I've got:


```
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


List list = Arrays.asList(1, 2);

env.fromCollection(list.iterator(), Integer.class).print();
```

I get the following exception:

```
Exception in thread "main" 
org.apache.flink.api.common.InvalidProgramException: Object 
org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91 
not serializable
at 
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)

at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at 
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)

... 11 more
```

This kind of makes sense. The root issue seems to be that the list's 
iterator is not serializable. In fact, java.util.Iterator doesn't 
implement Serializable.


I can't seem to find any examples of `FromIteratorFunction` being used 
in the flink codebase. Am I using it wrong?


Thanks!

--
Andrew Whitaker | andrew.whita...@braintreepayments.com 


--
Note: this information is confidential. It is prohibited to share, 
post online or otherwise publicize without Braintree's prior written 
consent.




Re: FromIteratorFunction problems

2016-04-07 Thread Chesnay Schepler
you will find some information regarding this issue in this JIRA: 
https://issues.apache.org/jira/browse/FLINK-2608


On 07.04.2016 22:18, Andrew Whitaker wrote:

Hi,

I'm trying to get a simple example of a source backed by an iterator 
working. Here's the code I've got:


```
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


List list = Arrays.asList(1, 2);

env.fromCollection(list.iterator(), Integer.class).print();
```

I get the following exception:

```
Exception in thread "main" 
org.apache.flink.api.common.InvalidProgramException: Object 
org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91 
not serializable
at 
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)

at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at 
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)

... 11 more
```

This kind of makes sense. The root issue seems to be that the list's 
iterator is not serializable. In fact, java.util.Iterator doesn't 
implement Serializable.


I can't seem to find any examples of `FromIteratorFunction` being used 
in the flink codebase. Am I using it wrong?


Thanks!

--
Andrew Whitaker | andrew.whita...@braintreepayments.com 


--
Note: this information is confidential. It is prohibited to share, 
post online or otherwise publicize without Braintree's prior written 
consent.




Re: Integrate Flink with S3 on EMR cluster

2016-04-07 Thread Timur Fayruzov
The exception does not show up in the console when I run the job, it only
shows in the logs. I thought it means that it happens either on AM or TM (I
assume what I see in stdout is client log). Is my thinking right?


On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi  wrote:

> Hey Timur,
>
> Just had a chat with Robert about this. I agree that the error message
> is confusing, but it is fine it this case. The file system classes are
> not on the class path of the client process, which is submitting the
> job.

Do you mean that classes should be in the classpath of
`org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
tried to add EMRFS jars to this classpath but it did not help. BTW, it
seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
point that I control here to add to classpath, so I had to set it manually.


> It fails to sample the input file sizes, but this is just an
> optimization step and hence it does not fail the job and only logs the
> error.
>
Is this optimization only for client side? In other words, does it affect
Flink's ability to choose proper type of a join?


>
> After the job is submitted everything should run as expected.
>
> You should be able to get rid of that exception by adding the missing
> classes to the class path of the client process (started via
> bin/flink), for example via the lib folder.
>
The above approach did not work, could you elaborate what you meant by 'lib
folder'?

Thanks,
Timur


> – Ufuk
>
>
>
>
> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov 
> wrote:
> > There's one more filesystem integration failure that I have found. My
> job on
> > a toy dataset succeeds, but Flink log contains the following message:
> > 2016-04-07 18:10:01,339 ERROR
> > org.apache.flink.api.common.io.DelimitedInputFormat   -
> Unexpected
> > problen while getting the file statistics for file 's3://...':
> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> > java.lang.RuntimeException: java.lang.RuntimeException:
> > java.lang.ClassNotFoundException: Class
> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> > at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
> > at
> >
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
> > at
> >
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
> > at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
> > at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
> > at
> >
> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
> > at
> >
> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
> > at
> >
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
> > at
> >
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
> > at
> >
> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
> > at
> >
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
> > at
> > org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
> > at
> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
> > at
> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
> > at
> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
> > at
> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
> > at
> > org.apache.flink.client.program.Client.runBlocking(Client.java:314)
> > at
> >
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> > at
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> > at
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
> > at
> >
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
> > at
> >
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
> > at
> >
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
> > at scala.Option.foreach(Option.scala:257)
> > at
> > com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
> > at
> > 

FromIteratorFunction problems

2016-04-07 Thread Andrew Whitaker
Hi,

I'm trying to get a simple example of a source backed by an iterator
working. Here's the code I've got:

```
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

List list = Arrays.asList(1, 2);

env.fromCollection(list.iterator(), Integer.class).print();
```

I get the following exception:

```
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Object
org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91
not serializable
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 11 more
```

This kind of makes sense. The root issue seems to be that the list's
iterator is not serializable. In fact, java.util.Iterator doesn't implement
Serializable.

I can't seem to find any examples of `FromIteratorFunction` being used in
the flink codebase. Am I using it wrong?

Thanks!

-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Re: mutable hashmap outside of stream, does it get snapshotted ?

2016-04-07 Thread Bart van Deenen
Thanks all!
I was under the mistaken impression that Flink automagically did the
snapshotting for me.
The info is really clear, I'll have no trouble implementing it.
 
Bart
 
 
On Thu, Apr 7, 2016, at 18:40, Aljoscha Krettek wrote:
> Hi,
> good explanation and pointers!
>
> I just want to add that the uriLookup table in your example is not
> really shared between your operator instances in a distributed
> setting. When serializing your transformations the current state of
> the HashMap is serialized with them because it is in the closure of
> the transformations. Then, on the cluster, the HashMap is serialized
> and all parallel instances basically work on their now local copy of
> the empty HashMap.
>
> Cheers,
> Aljoscha
>
> On Thu, 7 Apr 2016 at 18:30 Stefano Baghino
>  wrote:
>> Hi Bart,
>>
>> to make sure that the state is checkpointed you have to:
>>  1. configure your Flink installation with a reliable state backend
>> (optional for development, you can read more about it here[1])
>>  2. explicitly enable checkpointing in your program (see how here[2]
>> — it's just a couple of lines of code)
>>  3. extend your operators so that they checkpoint data, by
>> implementing the `Checkpointed` interface or using an instance
>> field (the semantics of the two approaches are slightly
>> different, you can read more about it here[3])
>> When your data is checkpointed you can access the state if your
>> operator implements the `RichFunction` interface (via an abstract
>> class that wraps the operator you need to implement, like
>> `RichMapFunction`).
>>
>> For your need in particular, I don't know a way to checkpoint state
>> shared between different operators; perhaps you can you refactor your
>> code so that the state is encapsulated in an operator implementation
>> and then moved through your pipeline as a parameter of the following
>> operators. Would that work?
>>
>> I apologize for just providing pointers to the docs in my reply but
>> checkpointing deserves a good explanation and I feel the docs get the
>> job done pretty well. I will gladly help you if you have any doubt.
>>
>> Hope I've been of some help.
>>
>>
>> On Thu, Apr 7, 2016 at 3:41 PM, Bart van Deenen
>>  wrote:
>>> Hi all
>>>
>>>  I'm having a datastream transformation, that updates a mutable
>>>  hashmap that exists outside of the stream.
>>>
>>>  So it's something like
>>>
>>>  object FlinkJob {
>>>  val uriLookup = mutable.HashMap.empty[String, Int]
>>>
>>>
>>>  def main(args: Array[String]) {
>>>
>>>  val stream: DataStream = ...
>>>
>>>  stream.keybBy(1).timeWindow(..).fold(..)
>>>  .window(..)
>>>  .map(..).fold(..)
>>>  .addSink(..)
>>>  }
>>>  }
>>>
>>>  where the uriLookup hashmap gets updated inside the stream
>>>  transformation,
>>>  and is serialized in the step before the addSink
>>>
>>>  It works fine, however
>>>
>>>  Does the snapshotting mechanism in case of a node failure actually
>>>  serialize this map?
>>>
>>>  And out of curiousity, can I actually see what data exists
>>>  inside the
>>>  snapshot data?
>>>
>>>  Thanks.
>>>
>>>  Bart
>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>> Software Engineer @ Radicalbit
 

Links:

  1. 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html#configuration
  2. 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/fault_tolerance.html
  3. 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html


Re: Integrate Flink with S3 on EMR cluster

2016-04-07 Thread Ufuk Celebi
Hey Timur,

Just had a chat with Robert about this. I agree that the error message
is confusing, but it is fine it this case. The file system classes are
not on the class path of the client process, which is submitting the
job. It fails to sample the input file sizes, but this is just an
optimization step and hence it does not fail the job and only logs the
error.

After the job is submitted everything should run as expected.

You should be able to get rid of that exception by adding the missing
classes to the class path of the client process (started via
bin/flink), for example via the lib folder.

– Ufuk




On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov  wrote:
> There's one more filesystem integration failure that I have found. My job on
> a toy dataset succeeds, but Flink log contains the following message:
> 2016-04-07 18:10:01,339 ERROR
> org.apache.flink.api.common.io.DelimitedInputFormat   - Unexpected
> problen while getting the file statistics for file 's3://...':
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.ClassNotFoundException: Class
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
> at
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
> at
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
> at
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
> at
> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
> at
> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
> at
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
> at
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
> at
> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
> at
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
> at
> org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
> at
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
> at
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
> at
> org.apache.flink.client.program.Client.runBlocking(Client.java:314)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
> at
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
> at
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
> at
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
> at scala.Option.foreach(Option.scala:257)
> at
> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
> at
> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
> Class 

Re: Integrate Flink with S3 on EMR cluster

2016-04-07 Thread Timur Fayruzov
There's one more filesystem integration failure that I have found. My job
on a toy dataset succeeds, but Flink log contains the following message:
2016-04-07 18:10:01,339 ERROR
org.apache.flink.api.common.io.DelimitedInputFormat   - Unexpected
problen while getting the file statistics for file 's3://...':
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.ClassNotFoundException: Class
com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
at
org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
at
org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
at
org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
at
org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
at
org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
at
org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
at
org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
at
org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
at
org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
at
org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at
org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
at
org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
at
org.apache.flink.client.program.Client.runBlocking(Client.java:314)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
at
com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
at
com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
at
com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
at scala.Option.foreach(Option.scala:257)
at
com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
at
com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at
org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
... 37 more
Caused by: java.lang.ClassNotFoundException: Class
com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
... 38 more

I assume this may be a big problem if run on large datasets as there will
be no information for optimizer. I tried to change EMRFS to NativeS3
driver, but get the same error, which is surprising. I expected
NativeS3FileSystem to be in the classpath since it ships with Flink runtime.

Thanks,
Timur


On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi 

Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Timur Fayruzov
Thank you, Till! setting the flag in flink-conf.yalm worked, I'm very glad
that it was resolved. Note, however, passing it as an argument to flink
script did not work. I tried to pass it as:
`-yDenv.java.opts="-Djava.library.path="`. I did not investigate any
further at this time.

Thanks,
Timur

On Thu, Apr 7, 2016 at 2:48 AM, Till Rohrmann  wrote:

> For passing the dynamic property directly when running things on YARN, you
> have to use -yDenv.java.opts="..."
> ​
>
> On Thu, Apr 7, 2016 at 11:42 AM, Till Rohrmann 
> wrote:
>
>> Hi Timur,
>>
>> what you can try doing is to pass the JVM parameter
>> -Djava.library.path= via the env.java.opts to the system. You
>> simply have to add env.java.opts: "-Djava.library.path=" in the
>> flink-conf.yaml or via -Denv.java.opts="-Djava.library.path=", if
>> I’m not mistaken.
>>
>> Cheers
>> Till
>> ​
>>
>> On Thu, Apr 7, 2016 at 10:07 AM, Timur Fayruzov > > wrote:
>>
>>> there is a hack for this issue: copying my native library to
>>> $HADOOP_HOME/lib/native makes it discoverable and a program runs, however
>>> this is not an appropriate solution and it seems to be fragile.
>>>
>>> I tried to find where 'lib/native' path appears in the configuration and
>>> found 2 places:
>>> hadoop-env.sh: export
>>> JAVA_LIBRARY_PATH="$JAVA_LIBRARY_PATH:/usr/lib/hadoop-lzo/lib/native
>>> mapred-site.xml: key: mapreduce.admin.user.env
>>>
>>> I tried to add path to dir with my native lib in both places, but still
>>> no luck.
>>>
>>> Thanks,
>>> Timur
>>>
>>> On Wed, Apr 6, 2016 at 11:21 PM, Timur Fayruzov <
>>> timur.fairu...@gmail.com> wrote:
>>>
 Hello,

 I'm not sure whether it's a Hadoop or Flink-specific question, but
 since I ran into this in the context of Flink I'm asking here. I would be
 glad if anyone can suggest a more appropriate place.

 I have a native library that I need to use in my Flink batch job that I
 run on EMR, and I try to point JVM to the location of native library.
 Normally, I'd do this using java.library.path parameter. So I try to run as
 follows:
 `
 HADOOP_CONF_DIR=/etc/hadoop/conf
 JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m
 yarn-cluster -yn 1 -yjm 768 -ytm 768 
 `
 It does not work, fails with `java.lang.UnsatisfiedLinkError` when
 trying to load the native lib. It probably has to do with YARN not not
 passing this parameter to task nodes, but my understanding of this
 mechanism is quite limited so far.

 I dug up this Jira ticket:
 https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting
 LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem
 either.

 Any help or hint where to look is highly appreciated.

 Thanks,
 Timur

>>>
>>>
>>
>


Re: mutable hashmap outside of stream, does it get snapshotted ?

2016-04-07 Thread Aljoscha Krettek
Hi,
good explanation and pointers!

I just want to add that the uriLookup table in your example is not really
shared between your operator instances in a distributed setting. When
serializing your transformations the current state of the HashMap is
serialized with them because it is in the closure of the transformations.
Then, on the cluster, the HashMap is serialized and all parallel instances
basically work on their now local copy of the empty HashMap.

Cheers,
Aljoscha

On Thu, 7 Apr 2016 at 18:30 Stefano Baghino 
wrote:

> Hi Bart,
>
> to make sure that the state is checkpointed you have to:
>
>1. configure your Flink installation with a reliable state backend
>(optional for development, you can read more about it here
>
> 
>)
>2. explicitly enable checkpointing in your program (see how here
>
> 
>— it's just a couple of lines of code)
>3. extend your operators so that they checkpoint data, by implementing
>the `Checkpointed` interface or using an instance field (the semantics of
>the two approaches are slightly different, you can read more about it
>here
>
> 
>)
>
> When your data is checkpointed you can access the state if your operator
> implements the `RichFunction` interface (via an abstract class that wraps
> the operator you need to implement, like `RichMapFunction`).
>
> For your need in particular, I don't know a way to checkpoint state shared
> between different operators; perhaps you can you refactor your code so that
> the state is encapsulated in an operator implementation and then moved
> through your pipeline as a parameter of the following operators. Would that
> work?
>
> I apologize for just providing pointers to the docs in my reply but
> checkpointing deserves a good explanation and I feel the docs get the job
> done pretty well. I will gladly help you if you have any doubt.
>
> Hope I've been of some help.
>
> On Thu, Apr 7, 2016 at 3:41 PM, Bart van Deenen  > wrote:
>
>> Hi all
>>
>> I'm having a datastream transformation, that updates a mutable
>> hashmap that exists outside of the stream.
>>
>> So it's something like
>>
>> object FlinkJob {
>>   val uriLookup = mutable.HashMap.empty[String, Int]
>>
>>
>>   def main(args: Array[String]) {
>>
>> val stream: DataStream = ...
>>
>> stream.keybBy(1).timeWindow(..).fold(..)
>> .window(..)
>> .map(..).fold(..)
>> .addSink(..)
>>   }
>> }
>>
>> where the uriLookup hashmap gets updated inside the stream
>> transformation,
>> and is serialized in the step before the addSink
>>
>> It works fine, however
>>
>> Does the snapshotting mechanism in case of a node failure actually
>> serialize this map?
>>
>> And out of curiousity, can I actually see what data exists inside the
>> snapshot data?
>>
>> Thanks.
>>
>> Bart
>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>


Re: WindowedStream operation questions

2016-04-07 Thread Aljoscha Krettek
Hi,
I saw the issue you opened. :-) I'll try and figure out how to get all the
Scaladocs on there.

Regarding the other questions. A WindowedStream is basically not a Stream
in itself but a stepping stone towards specifying a windowed operation that
results in a new Stream. So the pattern always has to be like this:

in
  .keyBy(...)
  .window(...)
  .reduce() // or fold() or apply()

Only with the final specification of the operation do you get a new
DataStream of elements. You can do a map() before the window operation but
not in between specifying a window and an operation that works on windows.

This differs from the Apache Beam (formerly Google Dataflow) model where
the window is a property of stream elements. There you can do:

in
  .map()
  .window()
  .map()
  .reduce()

And the reduce operation will work on the windows that where assigned in
the window operation.

I hope this helps somewhat but please let me know if I should go into
details.

Cheers,
Aljoscha

On Thu, 7 Apr 2016 at 17:22 Elias Levy  wrote:

> An observation and a couple of question from a novice.
>
> The observation: The Flink web site makes available ScalaDocs for
> org.apache.flink.api.scala but not for org.apache.flink.streaming.api.scala.
>
> Now for the questions:
>
> Why can't you use map to transform a data stream, say convert all the
> elements to integers (e.g. .map { x => 1 }), then create a tumbling
> processing time window (e.g.
> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2?
>
> Then the inverse: Why do AllWindiowedStream and WindowStream not have a
> map method?
>


Re: mutable hashmap outside of stream, does it get snapshotted ?

2016-04-07 Thread Stefano Baghino
Hi Bart,

to make sure that the state is checkpointed you have to:

   1. configure your Flink installation with a reliable state backend
   (optional for development, you can read more about it here
   

   )
   2. explicitly enable checkpointing in your program (see how here
   

   — it's just a couple of lines of code)
   3. extend your operators so that they checkpoint data, by implementing
   the `Checkpointed` interface or using an instance field (the semantics of
   the two approaches are slightly different, you can read more about it
   here
   

   )

When your data is checkpointed you can access the state if your operator
implements the `RichFunction` interface (via an abstract class that wraps
the operator you need to implement, like `RichMapFunction`).

For your need in particular, I don't know a way to checkpoint state shared
between different operators; perhaps you can you refactor your code so that
the state is encapsulated in an operator implementation and then moved
through your pipeline as a parameter of the following operators. Would that
work?

I apologize for just providing pointers to the docs in my reply but
checkpointing deserves a good explanation and I feel the docs get the job
done pretty well. I will gladly help you if you have any doubt.

Hope I've been of some help.

On Thu, Apr 7, 2016 at 3:41 PM, Bart van Deenen 
wrote:

> Hi all
>
> I'm having a datastream transformation, that updates a mutable
> hashmap that exists outside of the stream.
>
> So it's something like
>
> object FlinkJob {
>   val uriLookup = mutable.HashMap.empty[String, Int]
>
>
>   def main(args: Array[String]) {
>
> val stream: DataStream = ...
>
> stream.keybBy(1).timeWindow(..).fold(..)
> .window(..)
> .map(..).fold(..)
> .addSink(..)
>   }
> }
>
> where the uriLookup hashmap gets updated inside the stream
> transformation,
> and is serialized in the step before the addSink
>
> It works fine, however
>
> Does the snapshotting mechanism in case of a node failure actually
> serialize this map?
>
> And out of curiousity, can I actually see what data exists inside the
> snapshot data?
>
> Thanks.
>
> Bart
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


WindowedStream operation questions

2016-04-07 Thread Elias Levy
An observation and a couple of question from a novice.

The observation: The Flink web site makes available ScalaDocs for
org.apache.flink.api.scala but not for org.apache.flink.streaming.api.scala.

Now for the questions:

Why can't you use map to transform a data stream, say convert all the
elements to integers (e.g. .map { x => 1 }), then create a tumbling
processing time window (e.g.
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2?

Then the inverse: Why do AllWindiowedStream and WindowStream not have a map
method?


Re: Example - Reading Avro Generic records

2016-04-07 Thread Sourigna Phetsarath
Tranadeep,

Also, in your code example, when *reuseAvroValue* is *false* the code will
fail with this message:

java.lang.RuntimeException: The class
'org.apache.avro.generic.GenericRecord' is not instantiable: The class is
no proper class, it is either abstract, an interface, or a primitive type.
at
org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:222)
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:147)
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:122)
at



I had encountered this when I was write the PR.

-Gna

On Thu, Apr 7, 2016 at 11:08 AM, Sourigna Phetsarath <
gna.phetsar...@teamaol.com> wrote:

> Tranadeep,
>
> Thanks for pasting your code!
>
> I have a PR ready that extends AvroInputFormat and will submit it soon.
>
> Still waiting for the legal team at AOL to approve it.
>
> -Gna
>
> On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh 
> wrote:
>
>> Thank you Gna for opening the ticket.
>>
>> I looked into AvroInputFormat code and inspired by it I wrote a
>> GenericAvroInputFormat. The code is awfully similar (and hence redundant)
>> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
>> in flink to support GenericRecord.
>>
>> Anyways, I am pasting the code here for anyone who wants to use it (till
>> your code is part of Flink stable release)-
>>
>> import java.io.IOException;
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.file.DataFileReader;
>> import org.apache.avro.file.FileReader;
>> import org.apache.avro.file.SeekableInput;
>> import org.apache.avro.generic.GenericDatumReader;
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.avro.io.DatumReader;
>> import org.apache.flink.api.avro.FSDataInputStreamWrapper;
>> import org.apache.flink.api.common.io.FileInputFormat;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>> import org.apache.flink.api.java.typeutils.TypeExtractor;
>> import org.apache.flink.core.fs.FileInputSplit;
>> import org.apache.flink.core.fs.Path;
>> import org.apache.flink.util.InstantiationUtil;
>>
>> public class GenericAvroInputFormat extends FileInputFormat 
>> implements ResultTypeQueryable {
>>
>> private transient long end;
>> private transient Schema schema;
>> private transient FileReader fileReader;
>> private boolean reuseAvroValue = true;
>>
>> private static final long serialVersionUID = 1L;
>>
>> public GenericAvroInputFormat(Path filePath, Schema schema) {
>> super(filePath);
>> this.schema = schema;
>> }
>>
>> public void setReuseAvroValue(boolean reuseAvroValue) {
>> this.reuseAvroValue = reuseAvroValue;
>> }
>>
>> public void setUnsplittable(boolean unsplittable) {
>> this.unsplittable = unsplittable;
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> @Override
>> public void open(FileInputSplit split) throws IOException {
>> super.open(split);
>> SeekableInput sin = new FSDataInputStreamWrapper(stream, 
>> split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
>> DatumReader reader = new GenericDatumReader<>();
>> fileReader = DataFileReader.openReader(sin, reader);
>> fileReader.sync(split.getStart());
>> this.end = split.getStart() + split.getLength();
>> }
>>
>> @Override
>> public boolean reachedEnd() throws IOException {
>> return !fileReader.hasNext() || fileReader.pastSync(end);
>> }
>>
>> @Override
>> public GenericRecord nextRecord(GenericRecord reuseValue) throws 
>> IOException {
>> if (reachedEnd()) {
>> return null;
>> }
>>
>> if (!reuseAvroValue) {
>> reuseValue = InstantiationUtil.instantiate(GenericRecord.class, 
>> Object.class);
>> }
>>
>> reuseValue = fileReader.next(reuseValue);
>> return reuseValue;
>> }
>> }
>>
>>
>> Usage:
>>
>> public static void main(String[] args) throws Exception {
>> final ExecutionEnvironment env = 
>> ExecutionEnvironment.getExecutionEnvironment();
>> final Path inPath = new Path(args[0]);
>>
>> Schema schema = new Schema.Parser().parse(new 
>> File("/path/to/schemafile.avsc"));
>> DataSet dataSet = env.createInput(new 
>> GenericAvroInputFormat(inPath, schema));
>> dataSet.map(new MapFunction>() {
>> @Override
>> public Tuple2 map(GenericRecord record) {
>> Long id = (Long) record.get("id");
>> String someString = record.get("somestring").toString();
>> return new Tuple2<>(id, someString);
>> }
>> }).writeAsText(args[1]);
>>
>> env.execute();
>> }
>>
>>
>> 

Re: Example - Reading Avro Generic records

2016-04-07 Thread Sourigna Phetsarath
Tranadeep,

Thanks for pasting your code!

I have a PR ready that extends AvroInputFormat and will submit it soon.

Still waiting for the legal team at AOL to approve it.

-Gna

On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh  wrote:

> Thank you Gna for opening the ticket.
>
> I looked into AvroInputFormat code and inspired by it I wrote a
> GenericAvroInputFormat. The code is awfully similar (and hence redundant)
> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
> in flink to support GenericRecord.
>
> Anyways, I am pasting the code here for anyone who wants to use it (till
> your code is part of Flink stable release)-
>
> import java.io.IOException;
>
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileReader;
> import org.apache.avro.file.FileReader;
> import org.apache.avro.file.SeekableInput;
> import org.apache.avro.generic.GenericDatumReader;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.io.DatumReader;
> import org.apache.flink.api.avro.FSDataInputStreamWrapper;
> import org.apache.flink.api.common.io.FileInputFormat;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> import org.apache.flink.core.fs.FileInputSplit;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.util.InstantiationUtil;
>
> public class GenericAvroInputFormat extends FileInputFormat 
> implements ResultTypeQueryable {
>
> private transient long end;
> private transient Schema schema;
> private transient FileReader fileReader;
> private boolean reuseAvroValue = true;
>
> private static final long serialVersionUID = 1L;
>
> public GenericAvroInputFormat(Path filePath, Schema schema) {
> super(filePath);
> this.schema = schema;
> }
>
> public void setReuseAvroValue(boolean reuseAvroValue) {
> this.reuseAvroValue = reuseAvroValue;
> }
>
> public void setUnsplittable(boolean unsplittable) {
> this.unsplittable = unsplittable;
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(GenericRecord.class);
> }
>
> @Override
> public void open(FileInputSplit split) throws IOException {
> super.open(split);
> SeekableInput sin = new FSDataInputStreamWrapper(stream, 
> split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
> DatumReader reader = new GenericDatumReader<>();
> fileReader = DataFileReader.openReader(sin, reader);
> fileReader.sync(split.getStart());
> this.end = split.getStart() + split.getLength();
> }
>
> @Override
> public boolean reachedEnd() throws IOException {
> return !fileReader.hasNext() || fileReader.pastSync(end);
> }
>
> @Override
> public GenericRecord nextRecord(GenericRecord reuseValue) throws 
> IOException {
> if (reachedEnd()) {
> return null;
> }
>
> if (!reuseAvroValue) {
> reuseValue = InstantiationUtil.instantiate(GenericRecord.class, 
> Object.class);
> }
>
> reuseValue = fileReader.next(reuseValue);
> return reuseValue;
> }
> }
>
>
> Usage:
>
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> final Path inPath = new Path(args[0]);
>
> Schema schema = new Schema.Parser().parse(new 
> File("/path/to/schemafile.avsc"));
> DataSet dataSet = env.createInput(new 
> GenericAvroInputFormat(inPath, schema));
> dataSet.map(new MapFunction>() {
> @Override
> public Tuple2 map(GenericRecord record) {
> Long id = (Long) record.get("id");
> String someString = record.get("somestring").toString();
> return new Tuple2<>(id, someString);
> }
> }).writeAsText(args[1]);
>
> env.execute();
> }
>
>
> -Tarandeep
>
>
>
>
>
>
>
> On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
> gna.phetsar...@teamaol.com> wrote:
>
>> Tarandeep,
>>
>> There isn't a way yet, but I am proposing to do one:
>> https://issues.apache.org/jira/browse/FLINK-3691
>>
>> -Gna
>>
>> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh 
>> wrote:
>>
>>> Hi,
>>>
>>> Can someone please point me to an example of creating DataSet using Avro
>>> Generic Records?
>>>
>>> I tried this code -
>>>
>>> final ExecutionEnvironment env = 
>>> ExecutionEnvironment.getExecutionEnvironment();
>>> final Path iPath = new Path(args[0]);
>>>
>>> DataSet dataSet = env.createInput(new 
>>> AvroInputFormat<>(iPath, GenericRecord.class));
>>> dataSet.map(new MapFunction>() {
>>> @Override
>>> public 

Re: Kafka state backend?

2016-04-07 Thread Zach Cox
Ah I didn't see that, thanks for the link! Glad this is being discussed.

On Thu, Apr 7, 2016 at 5:06 AM Aljoscha Krettek  wrote:

> Hi Zach,
> I'm afraid someone already beat you to it :-)
> https://issues.apache.org/jira/browse/FLINK-3692
>
> In the issue we touch on some of the difficulties with this that stem from
> the differences in the guarantees that Flink and Samza try to give.
>
> Cheers,
> Aljoscha
>
> On Tue, 5 Apr 2016 at 22:24 Zach Cox  wrote:
>
>> Hi - as clarified in another thread [1] stateful operators store all of
>> their current state in the backend on each checkpoint. Just curious if
>> Kafka topics with log compaction have ever been considered as a possible
>> state backend?
>>
>> Samza [2] uses RocksDB as a local state store, with all writes also going
>> to a log-compacted Kafka topic for persistence. This seems like it might
>> also be a good alternative backend in Flink for jobs with large amounts of
>> long-lasting state. You would give up some throughput (due to Kafka
>> producer writes) but there would be almost nothing to do on checkpoints.
>>
>> Just wanted to propose the idea and see if it has already been discussed,
>> or maybe I'm missing some reasons why it would be a bad idea.
>>
>> Thanks,
>> Zach
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-state-stored-in-backend-and-deleting-old-checkpoint-state-td5927.html
>> [2]
>> http://samza.apache.org/learn/documentation/0.10/container/state-management.html#local-state-in-samza
>>
>>


mutable hashmap outside of stream, does it get snapshotted ?

2016-04-07 Thread Bart van Deenen
Hi all

I'm having a datastream transformation, that updates a mutable 
hashmap that exists outside of the stream.

So it's something like

object FlinkJob {
  val uriLookup = mutable.HashMap.empty[String, Int]


  def main(args: Array[String]) {

val stream: DataStream = ...

stream.keybBy(1).timeWindow(..).fold(..)
.window(..)
.map(..).fold(..)
.addSink(..)
  }
}

where the uriLookup hashmap gets updated inside the stream
transformation, 
and is serialized in the step before the addSink

It works fine, however

Does the snapshotting mechanism in case of a node failure actually
serialize this map?

And out of curiousity, can I actually see what data exists inside the
snapshot data?

Thanks.

Bart



Limit buffer size for a job

2016-04-07 Thread Andrew Ge Wu
Hi guys

We have a prioritized queue, where high priority item can jump the queue and we 
do not want to cache too much record in the buffer.
Is there a way to configure my streaming source to use less buffer? so source 
always fetch and get latest high prio records?

Any suggestion? thanks!


Andrew
-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Find differences

2016-04-07 Thread Fabian Hueske
I would go with an outer join as Stefano suggested.
Outer joins can be executed as hash joins which will probably be more
efficient than using a sort based groupBy/reduceGroup.
Also outer joins are a more intuitive and simpler, IMO.

2016-04-07 12:35 GMT+02:00 Stefano Baghino :

> Perhaps an outer join can do the trick as well but I don't know which one
> would perform better.
>
> On Thu, Apr 7, 2016 at 12:05 PM, Lydia Ickler 
> wrote:
>
>>  Nevermind! I figured it out with groupby and
>> Reducegroup
>>
>> Von meinem iPhone gesendet
>>
>> > Am 07.04.2016 um 11:51 schrieb Lydia Ickler :
>> >
>> > Hi,
>> >
>> > If i have 2 DataSets A and B of Type Tuple3 how
>> would I get a subset of A (based on the fields (0,1)) that does not occur
>> in B?
>> > Is there maybe an already implemented method?
>> >
>> > Best regards,
>> > Lydia
>> >
>> > Von meinem iPhone gesendet
>>
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>


Re: State in external db (dynamodb)

2016-04-07 Thread Aljoscha Krettek
Hi,
regarding windows and incremental aggregation. This is already happening in
Flink as of now. When you give a ReduceFunction on a window, which "sum"
internally does, the result for a window is incrementally updated whenever
a new element comes in. This incremental aggregation only happens when you
specify a ReduceFunction or a FoldFunction, not for the general case of a
WindowFunction, where all elements in the window are required.

You are right about incremental snapshots. We mainly want to introduce them
to reduce latency incurred by snapshotting. Right now, processing stalls
when a checkpoint happens.

Cheers,
Aljoscha

On Thu, 7 Apr 2016 at 13:12 Shannon Carey  wrote:

> Thanks very kindly for your response, Stephan!
>
> We will definitely use a custom sink for persistence of idempotent
> mutations whenever possible. Exposing state as read-only to external
> systems is a complication we will try to avoid. Also, we will definitely
> only write to the DB upon checkpoint, and the write will be synchronous and
> transactional (no possibility of partial success/failure).
>
> However, we do want Flink state to be durable, we want it to be in memory
> when possible, and we want to avoid running out of memory due to the size
> of the state. For example, if you have a wide window that hasn't gotten an
> event for a long time, we want to evict that window state from memory.
> We're now thinking of using Redis (via AWS Elasticache) which also
> conveniently has TTL, instead of DynamoDB.
>
> I just wanted to check whether eviction of (inactive/quiet) state from
> memory is something that I should consider implementing, or whether Flink
> already had some built-in way of doing it.
>
> Along the same lines, I am also wondering whether Flink already has means
> of compacting the state of a window by applying an aggregation function to
> the elements so-far (eg. every time window is triggered)? For example, if
> you are only executing a sum on the contents of the window, the window
> state doesn't need to store all the individual items in the window, it only
> needs to store the sum. Aggregations other than "sum" might have that
> characteristic too. I don't know if Flink is already that intelligent or
> whether I should figure out how to aggregate window contents myself when
> possible with something like a window fold? Another poster (Aljoscha) was
> talking about adding incremental snapshots, but it sounds like that would
> only improve the write throughput not the memory usage.
>
> Thanks again!
> Shannon Carey
>
>
> From: Stephan Ewen 
> Date: Wednesday, April 6, 2016 at 10:37 PM
> To: 
> Subject: Re: State in external db (dynamodb)
>
> Hi Shannon!
>
> Welcome to the Flink community!
>
> You are right, sinks need in general to be idempotent if you want
> "exactly-once" semantics, because there can be a replay of elements that
> were already written.
>
> However, what you describe later, overwriting of a key with a new value
> (or the same value again) is pretty much sufficient. That means that when a
> duplicate write happens during replay, the value for the key is simply
> overwritten with the same value again.
> As long as all computation is purely in Flink and you only write to the
> key/value store (rather than read from k/v, modify in Flink, write to k/v),
> you get the consistency that for example counts/aggregates never have
> duplicates.
>
> If Flink needs to look up state from the database (because it is no longer
> in Flink), it is a bit more tricky. I assume that is where you are going
> with "Subsequently, when an event is processed, we must be able to
> quickly load up any evicted state".  In that case, there are two things
> you can do:
>
> (1)  Only write to the DB upon a checkpoint, at which point it is known
> that no replay of that data will occur any more. Values from partially
> successful writes will be overwritten with correct value. I assume that is
> what you thought of when referring to the State Backend, because in some
> sense, that is what that state backend would do.
>
> I think it is simpler to realize that in a custom sink, than developing a
> new state backend.  Another Flink committer (Chesnay) has developed some
> nice tooling for that, to be merged into Flink soon.
>
> (2) You could attach version numbers to every write, and increment the
> versions upon each checkpoint. That allows you to always refer to a
> consistent previous value, if some writes were made, but a failure occurred
> before the checkpoint completed.
>
> I hope these answers apply to your case. Let us know if some things are
> still unclear, or if I misunderstood your question!
>
>
> Greetings,
> Stephan
>
>
>
> On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever  > wrote:
>
>> FYI Cassandra has a TTL on data:
>> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>>
>> On Wed, Apr 6, 2016 at 7:55 AM, 

Re: State in external db (dynamodb)

2016-04-07 Thread Shannon Carey
Thanks very kindly for your response, Stephan!

We will definitely use a custom sink for persistence of idempotent mutations 
whenever possible. Exposing state as read-only to external systems is a 
complication we will try to avoid. Also, we will definitely only write to the 
DB upon checkpoint, and the write will be synchronous and transactional (no 
possibility of partial success/failure).

However, we do want Flink state to be durable, we want it to be in memory when 
possible, and we want to avoid running out of memory due to the size of the 
state. For example, if you have a wide window that hasn't gotten an event for a 
long time, we want to evict that window state from memory. We're now thinking 
of using Redis (via AWS Elasticache) which also conveniently has TTL, instead 
of DynamoDB.

I just wanted to check whether eviction of (inactive/quiet) state from memory 
is something that I should consider implementing, or whether Flink already had 
some built-in way of doing it.

Along the same lines, I am also wondering whether Flink already has means of 
compacting the state of a window by applying an aggregation function to the 
elements so-far (eg. every time window is triggered)? For example, if you are 
only executing a sum on the contents of the window, the window state doesn't 
need to store all the individual items in the window, it only needs to store 
the sum. Aggregations other than "sum" might have that characteristic too. I 
don't know if Flink is already that intelligent or whether I should figure out 
how to aggregate window contents myself when possible with something like a 
window fold? Another poster (Aljoscha) was talking about adding incremental 
snapshots, but it sounds like that would only improve the write throughput not 
the memory usage.

Thanks again!
Shannon Carey


From: Stephan Ewen >
Date: Wednesday, April 6, 2016 at 10:37 PM
To: >
Subject: Re: State in external db (dynamodb)

Hi Shannon!

Welcome to the Flink community!

You are right, sinks need in general to be idempotent if you want 
"exactly-once" semantics, because there can be a replay of elements that were 
already written.

However, what you describe later, overwriting of a key with a new value (or the 
same value again) is pretty much sufficient. That means that when a duplicate 
write happens during replay, the value for the key is simply overwritten with 
the same value again.
As long as all computation is purely in Flink and you only write to the 
key/value store (rather than read from k/v, modify in Flink, write to k/v), you 
get the consistency that for example counts/aggregates never have duplicates.

If Flink needs to look up state from the database (because it is no longer in 
Flink), it is a bit more tricky. I assume that is where you are going with 
"Subsequently, when an event is processed, we must be able to quickly load up 
any evicted state".  In that case, there are two things you can do:

(1)  Only write to the DB upon a checkpoint, at which point it is known that no 
replay of that data will occur any more. Values from partially successful 
writes will be overwritten with correct value. I assume that is what you 
thought of when referring to the State Backend, because in some sense, that is 
what that state backend would do.

I think it is simpler to realize that in a custom sink, than developing a new 
state backend.  Another Flink committer (Chesnay) has developed some nice 
tooling for that, to be merged into Flink soon.

(2) You could attach version numbers to every write, and increment the versions 
upon each checkpoint. That allows you to always refer to a consistent previous 
value, if some writes were made, but a failure occurred before the checkpoint 
completed.

I hope these answers apply to your case. Let us know if some things are still 
unclear, or if I misunderstood your question!


Greetings,
Stephan



On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever 
> wrote:
FYI Cassandra has a TTL on data: 
https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html

On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey 
> wrote:
Hi, new Flink user here!

I found a discussion on user@flink.apache.org 
about using DynamoDB as a sink. However, as noted, sinks have an at-least-once 
guarantee so your operations must idempotent.

However, another way to go about this (and correct me if I'm wrong) is to write 
the state to the external store via a custom State Backend. Since the state 
participates in checkpointing, you don't have to worry about idempotency: every 
time state is checkpointed, overwrite the value of that key.

We are starting a project with Flink, and we are interested in evicting the 
state from memory once a TTL is reached during which no events have 

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

2016-04-07 Thread norman sp
Hi,
here is an example input that produces the error. These are read from Kafka.

01:43:43.5921   2121{"Pressure target - Value":"6"}
01:43:43.5961   2121{"Flow target - Value":"23"}
01:43:44.2631   2121{"Pressure target - Value":"7"}
01:43:44.9721   2121{"Flow target - Value":"24"}
01:43:45.1761   2121{"Flow target - Value":"25"}
01:43:45.2791   2121{"Flow target - Value":"26"}
01:43:45.3821   2121{"Flow target - Value":"27"}
01:43:45.5861   2121{"Flow target - Value":"28"}
01:43:46.3101   2121{"Pressure target - Value":"7"}
01:43:46.3501   2121{"Flow target - Value":"28"}
01:43:48.5631   2121{"Pressure target - Value":"7"}
01:43:48.5671   2121{"Flow target - Value":"28"}
01:43:51.8501   2121{"Pressure target - Value":"7"}
01:43:51.8901   2121{"Flow target - Value":"28"}
01:43:53.5121   2121{"Pressure target - Value":"7"}
01:43:53.5161   2121{"Flow target - Value":"28"}
01:43:56.1081   2121{"Pressure target - Value":"7"}
01:43:56.1121   2121{"Flow target - Value":"28"}
01:43:58.5331   2121{"Pressure target - Value":"7"}
01:43:58.5381   2121{"Flow target - Value":"28"}
01:44:01.6001   2121{"Pressure target - Value":"7"}
01:44:01.6301   2121{"Flow target - Value":"28"}
01:44:03.5871   2121{"Pressure target - Value":"7"}
01:44:03.5911   2121{"Flow target - Value":"28"}
01:44:06.1801   2121{"Pressure target - Value":"7"}
01:44:06.2301   2121{"Flow target - Value":"28"}


Greetz Norman



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5996.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: threads, parallelism and task managers

2016-04-07 Thread Flavio Pompermaier
We've finally created a running example (For Flink 0.10.2) of our improved
JDBC imputformat that you can run from an IDE (it creates an in-memory
derby database with 1000 rows and batch of 10) at
https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
The first time you run the program you have to comment the following line:

stmt.executeUpdate("Drop Table users ");

In your pom declare the following dependencies:


org.apache.derby
derby
10.10.1.1


org.apache.commons
commons-pool2
2.4.2


In my laptop I have 8 cores and if I put parallelism to 16 I expect to see
16 calls to the connection pool (i.e. ' CREATING NEW
CONNECTION!') while I see only 8 (up to my maximum number of cores).
The number of created task instead is correct (16).

I hope this could help in understanding where the problem is!

Best and thank in advance,
Flavio

On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli 
wrote:

> Hi Ufuk,
>
> here is our preliminary input formar implementation:
> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>
> if you need a running project, I will have to create a test one cause I
> cannot share the current configuration.
>
> thanks a lot in advance!
>
>
>
> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi :
>
>> Do you have the code somewhere online? Maybe someone can have a quick
>> look over it later. I'm pretty sure that is indeed a problem with the
>> custom input format.
>>
>> – Ufuk
>>
>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli 
>> wrote:
>> > Perhaps there is a misunderstanding on my side over the parallelism and
>> > split management given a data source.
>> >
>> > We started from the current JDBCInputFormat to make it multi-thread.
>> Then,
>> > given a space of keys, we create the splits based on a fetchsize set as
>> a
>> > parameter. In the open, we get a connection from the pool, and execute a
>> > query using the split interval. This sets the 'resultSet', and then the
>> > DatasourceTask iterates between reachedEnd, next and close. On close,
>> the
>> > connection is returned to the pool. We set parallelism to 32, and we
>> would
>> > expect 32 connection opened but the connections opened are just 8.
>> >
>> > We tried to make an example with the textinputformat, but being a
>> > delimitedinpurformat, the open is called sequentially when statistics
>> are
>> > built, and then the processing is executed in parallel just after all
>> the
>> > open are executed. This is not feasible in our case, because there
>> would be
>> > millions of queries before the statistics are collected.
>> >
>> > Perhaps we are doing something wrong, still to figure out what. :-/
>> >
>> > thanks a lot for your help.
>> >
>> > saluti,
>> > Stefano
>> >
>> >
>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli :
>> >>
>> >> That is exactly my point. I should have 32 threads running, but I have
>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>> Flavio
>> >> and I will try to make a simple program to produce the problem. If we
>> solve
>> >> our issues on the way, we'll let you know.
>> >>
>> >> thanks a lot anyway.
>> >>
>> >> saluti,
>> >> Stefano
>> >>
>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann :
>> >>>
>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>> >>> futures and their callbacks. But as Ufuk said, each task will spawn
>> it’s own
>> >>> thread and if you set the parallelism to 32 then you should have 32
>> threads
>> >>> running.
>> >>>
>> >>>
>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>> s.bort...@gmail.com>
>> >>> wrote:
>> 
>>  In fact, I don't use it. I just had to crawl back the runtime
>>  implementation to get to the point where parallelism was switching
>> from 32
>>  to 8.
>> 
>>  saluti,
>>  Stefano
>> 
>>  2016-03-29 12:24 GMT+02:00 Till Rohrmann :
>> >
>> > Hi,
>> >
>> > for what do you use the ExecutionContext? That should actually be
>> > something which you shouldn’t be concerned with since it is only
>> used
>> > internally by the runtime.
>> >
>> > Cheers,
>> > Till
>> >
>> >
>> > On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
>> s.bort...@gmail.com>
>> > wrote:
>> >>
>> >> Well, in theory yes. Each task has a thread, but only a number is
>> run
>> >> in parallel (the job of the scheduler).  Parallelism is set in the
>> >> environment. However, whereas the parallelism parameter is set and
>> read
>> >> correctly, when it comes to actual starting of the threads, the
>> number is
>> >> fix to 8. We run a debugger to get to the point where the thread
>> was
>> >> started. As Flavio mentioned, the ExecutionContext has the
>> parallelims set
>> >> to 8. We have a pool of connections to a RDBS and il logs the
>> creation of
>> 

Re: Find differences

2016-04-07 Thread Stefano Baghino
Perhaps an outer join can do the trick as well but I don't know which one
would perform better.

On Thu, Apr 7, 2016 at 12:05 PM, Lydia Ickler 
wrote:

>  Nevermind! I figured it out with groupby and
> Reducegroup
>
> Von meinem iPhone gesendet
>
> > Am 07.04.2016 um 11:51 schrieb Lydia Ickler :
> >
> > Hi,
> >
> > If i have 2 DataSets A and B of Type Tuple3 how
> would I get a subset of A (based on the fields (0,1)) that does not occur
> in B?
> > Is there maybe an already implemented method?
> >
> > Best regards,
> > Lydia
> >
> > Von meinem iPhone gesendet
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Flink CEP AbstractCEPPatternOperator fail after event detection

2016-04-07 Thread Till Rohrmann
Hi Norman,

could you provide me an example input data set which produces the error?
E.g. the list of strings you inserted into Kafka/read from Kafka?

Cheers,
Till

On Thu, Apr 7, 2016 at 11:05 AM, norman sp  wrote:

> Hi Till,
> thank you. here's the code:
>
> public class CepStorzSimulator {
>
> public static void main(String[] args) throws Exception {
>
> final ParameterTool parameterTool =
> ParameterTool.fromArgs(args);
>
> if(parameterTool.getNumberOfParameters() < 3) {
> System.out.println("Missing
> parameters!\nUsage: Kafka --topic 
> --bootstrap.servers  --group.id ");
> System.exit(1);
> }
>
> CepStorzSimulator reader = new CepStorzSimulator();
> reader.run(parameterTool);
> }
>
> public void run(ParameterTool parameterTool) throws Exception {
>
> String topic = "test-simulator";
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> //env.getConfig().disableSysoutLogging();
>
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
> 5000));
> //env.enableCheckpointing(15000);
>  // create a checkpoint every 5
> seconds
> env.setParallelism(4);
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>
> DataStream kafkaStream = env.addSource(new
> FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(),
> parameterTool.getProperties()));
>
> DataStream> data
> =
> kafkaStream.flatMap(new SplitMapper());
>
> SingleOutputStreamOperator Double,
> Double, Double>> windowedData =
>  data.filter(new
> FilterFunction String, Double>>() {
>
> private static final long
> serialVersionUID = -5952425756492833594L;
>
> @Override
> public boolean
> filter(Tuple5
> val) throws Exception {
>
> return
> val.f3.contains("target - Value");
> }
>  })
>  .keyBy(3)
>  .timeWindow(Time.seconds(10),
> Time.seconds(1))
>  .fold(new Tuple6<>("", "", "",
> 0.0d, 0.0d, 0.0d), new
> pressureElementCount());
>
> windowedData.print();
>
> Pattern Double>, ?>
> FlowFirstPattern =
> Pattern. Double, Double,
> Double>>begin("FlowOver10")
> .where(new FilterFunction String, String, Double,
> Double, Double>>() {
>
> private static final long serialVersionUID
> = 5861517245439863889L;
>
> @Override
> public boolean
> filter(Tuple6 Double> value) throws Exception {
>
> double avgFlow=
> (value.f5/value.f4);
>
> return
> value.f2.contains("Flow target - Value") && avgFlow > 25.0;//
> && (value.f2 > avgFlow*1.0);
> }
> })
> .followedBy("PressureOver10").where(new
> FilterFunction>()
> {
>
> private static final long
> serialVersionUID = -4037517308930307522L;
>
> @Override
> public boolean
> filter(Tuple6 Double> value) throws Exception {
>
> double avgPressure =
> (value.f5/value.f4);
>
> //System.out.println("Pressure: " + avgPressure);
>
> return
> value.f2.equals("Pressure target - Value") && (avgPressure >
> 5.0);// && (value.f2 > avgPressure*1.0);
> }
> })
> .within(Time.seconds(10));
>
> PatternStream Double, Double>>
> FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
> DataStream 

Find differences

2016-04-07 Thread Lydia Ickler
Hi,

If i have 2 DataSets A and B of Type Tuple3 how would I 
get a subset of A (based on the fields (0,1)) that does not occur in B?
Is there maybe an already implemented method?

Best regards,
Lydia

Von meinem iPhone gesendet

Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Till Rohrmann
For passing the dynamic property directly when running things on YARN, you
have to use -yDenv.java.opts="..."
​

On Thu, Apr 7, 2016 at 11:42 AM, Till Rohrmann  wrote:

> Hi Timur,
>
> what you can try doing is to pass the JVM parameter
> -Djava.library.path= via the env.java.opts to the system. You
> simply have to add env.java.opts: "-Djava.library.path=" in the
> flink-conf.yaml or via -Denv.java.opts="-Djava.library.path=", if
> I’m not mistaken.
>
> Cheers
> Till
> ​
>
> On Thu, Apr 7, 2016 at 10:07 AM, Timur Fayruzov 
> wrote:
>
>> there is a hack for this issue: copying my native library to
>> $HADOOP_HOME/lib/native makes it discoverable and a program runs, however
>> this is not an appropriate solution and it seems to be fragile.
>>
>> I tried to find where 'lib/native' path appears in the configuration and
>> found 2 places:
>> hadoop-env.sh: export
>> JAVA_LIBRARY_PATH="$JAVA_LIBRARY_PATH:/usr/lib/hadoop-lzo/lib/native
>> mapred-site.xml: key: mapreduce.admin.user.env
>>
>> I tried to add path to dir with my native lib in both places, but still
>> no luck.
>>
>> Thanks,
>> Timur
>>
>> On Wed, Apr 6, 2016 at 11:21 PM, Timur Fayruzov > > wrote:
>>
>>> Hello,
>>>
>>> I'm not sure whether it's a Hadoop or Flink-specific question, but since
>>> I ran into this in the context of Flink I'm asking here. I would be glad if
>>> anyone can suggest a more appropriate place.
>>>
>>> I have a native library that I need to use in my Flink batch job that I
>>> run on EMR, and I try to point JVM to the location of native library.
>>> Normally, I'd do this using java.library.path parameter. So I try to run as
>>> follows:
>>> `
>>> HADOOP_CONF_DIR=/etc/hadoop/conf
>>> JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m
>>> yarn-cluster -yn 1 -yjm 768 -ytm 768 
>>> `
>>> It does not work, fails with `java.lang.UnsatisfiedLinkError` when
>>> trying to load the native lib. It probably has to do with YARN not not
>>> passing this parameter to task nodes, but my understanding of this
>>> mechanism is quite limited so far.
>>>
>>> I dug up this Jira ticket:
>>> https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting
>>> LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem
>>> either.
>>>
>>> Any help or hint where to look is highly appreciated.
>>>
>>> Thanks,
>>> Timur
>>>
>>
>>
>


Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Till Rohrmann
Hi Timur,

what you can try doing is to pass the JVM parameter
-Djava.library.path= via the env.java.opts to the system. You simply
have to add env.java.opts: "-Djava.library.path=" in the
flink-conf.yaml or via -Denv.java.opts="-Djava.library.path=", if I’m
not mistaken.

Cheers
Till
​

On Thu, Apr 7, 2016 at 10:07 AM, Timur Fayruzov 
wrote:

> there is a hack for this issue: copying my native library to
> $HADOOP_HOME/lib/native makes it discoverable and a program runs, however
> this is not an appropriate solution and it seems to be fragile.
>
> I tried to find where 'lib/native' path appears in the configuration and
> found 2 places:
> hadoop-env.sh: export
> JAVA_LIBRARY_PATH="$JAVA_LIBRARY_PATH:/usr/lib/hadoop-lzo/lib/native
> mapred-site.xml: key: mapreduce.admin.user.env
>
> I tried to add path to dir with my native lib in both places, but still no
> luck.
>
> Thanks,
> Timur
>
> On Wed, Apr 6, 2016 at 11:21 PM, Timur Fayruzov 
> wrote:
>
>> Hello,
>>
>> I'm not sure whether it's a Hadoop or Flink-specific question, but since
>> I ran into this in the context of Flink I'm asking here. I would be glad if
>> anyone can suggest a more appropriate place.
>>
>> I have a native library that I need to use in my Flink batch job that I
>> run on EMR, and I try to point JVM to the location of native library.
>> Normally, I'd do this using java.library.path parameter. So I try to run as
>> follows:
>> `
>> HADOOP_CONF_DIR=/etc/hadoop/conf
>> JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m
>> yarn-cluster -yn 1 -yjm 768 -ytm 768 
>> `
>> It does not work, fails with `java.lang.UnsatisfiedLinkError` when trying
>> to load the native lib. It probably has to do with YARN not not passing
>> this parameter to task nodes, but my understanding of this mechanism is
>> quite limited so far.
>>
>> I dug up this Jira ticket:
>> https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting
>> LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem
>> either.
>>
>> Any help or hint where to look is highly appreciated.
>>
>> Thanks,
>> Timur
>>
>
>


Re: Flink CEP AbstractCEPPatternOperator fail after event detection

2016-04-07 Thread norman sp
Hi Till,
thank you. here's the code:

public class CepStorzSimulator {

public static void main(String[] args) throws Exception {

final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);

if(parameterTool.getNumberOfParameters() < 3) {
System.out.println("Missing parameters!\nUsage: 
Kafka --topic 
--bootstrap.servers  --group.id ");
System.exit(1);
}

CepStorzSimulator reader = new CepStorzSimulator();
reader.run(parameterTool);
}

public void run(ParameterTool parameterTool) throws Exception {

String topic = "test-simulator";

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
5000));
//env.enableCheckpointing(15000);   
// create a checkpoint every 5
seconds
env.setParallelism(4);

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

DataStream kafkaStream = env.addSource(new
FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(),
parameterTool.getProperties()));

DataStream> data =
kafkaStream.flatMap(new SplitMapper()); 

SingleOutputStreamOperator> windowedData = 
 data.filter(new 
FilterFunction>() {

private static final long 
serialVersionUID = -5952425756492833594L;

@Override
public boolean 
filter(Tuple5
val) throws Exception {

return 
val.f3.contains("target - Value");
}
 })
 .keyBy(3)
 .timeWindow(Time.seconds(10), 
Time.seconds(1))
 .fold(new Tuple6<>("", "", "", 0.0d, 
0.0d, 0.0d), new
pressureElementCount());
 
windowedData.print();

Pattern, ?>
FlowFirstPattern = 
Pattern.>begin("FlowOver10")
.where(new FilterFunction>() {

private static final long serialVersionUID = 
5861517245439863889L;

@Override
public boolean filter(Tuple6 value) throws Exception {

double avgFlow= 
(value.f5/value.f4);

return value.f2.contains("Flow 
target - Value") && avgFlow > 25.0;//
&& (value.f2 > avgFlow*1.0);
}
})
.followedBy("PressureOver10").where(new
FilterFunction>()
{

private static final long 
serialVersionUID = -4037517308930307522L;

@Override
public boolean filter(Tuple6 value) throws Exception {

double avgPressure = 
(value.f5/value.f4);
//System.out.println("Pressure: 
" + avgPressure);

return 
value.f2.equals("Pressure target - Value") && (avgPressure >
5.0);// && (value.f2 > avgPressure*1.0);
}
})
.within(Time.seconds(10));  

PatternStream>
FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);

Re: YARN High Availability

2016-04-07 Thread Robert Metzger
Cool. I'm happy that you were able to validate the issue.

I'll write a fix for it: https://issues.apache.org/jira/browse/FLINK-3712

On Thu, Apr 7, 2016 at 10:43 AM, Konstantin Knauf <
konstantin.kn...@tngtech.com> wrote:

> Hi everyone,
>
> thanks to Robert, I found the problem.
>
> I was setting "recovery.zookeeper.path.root" on the command line with
> -yD. Apparently this is currently not supported. You need to set it the
> parameter in flink-conf.yaml.
>
> Cheers,
>
> Konstantin
>
> On 05.04.2016 12:52, Konstantin Knauf wrote:
> > Hi Robert,
> >
> > I tried several paths and rmr before.
> >
> > It stopped after 1-2 minutes. There was an exception on the shell.
> > Sorry, should have attached to the last mail.
> >
> > Thanks,
> >
> > Konstnatin
> >
> > On 05.04.2016 11:22, Robert Metzger wrote:
> >> I've tried reproducing the issue on a test cluster, but everything
> >> worked fine.
> >>
> >> Have you tried different values for "recovery.zookeeper.path.root" or
> >> only one? Maybe the path you've put contains invalid data?
> >>
> >> Regarding the client log you've send: Did you manually stop the client
> >> or did it stop after a few minutes?
> >> The JobManager stops after a few minutes because the client requested a
> >> shutdown. Usually, the client only shuts down on an exception or when
> >> the user stops the yarn session.
> >> There is no exception in the client log. Was there an exception printed
> >> to the shell?
> >>
> >> This log message:
> >>
> >> 2016-04-05 08:48:34,912 DEBUG org.apache.flink.yarn.FlinkYarnCluster
> >>- Received message option None
> >>
> >> Should not be an issue.
> >>
> >>
> >> On Tue, Apr 5, 2016 at 10:14 AM, Ufuk Celebi  >> > wrote:
> >>
> >> Hey Konstantin,
> >>
> >> just looked at the logs and the cluster is started, but the job is
> >> indeed never submitted.
> >>
> >> I've forwarded this to Robert, because he is familiar with the YARN
> >> client. I will look into how the client interacts with the ZooKeeper
> >> root path.
> >>
> >> – Ufuk
> >>
> >>
> >> On Tue, Apr 5, 2016 at 9:18 AM, Konstantin Knauf
> >>  >>
> >> wrote:
> >> > Hi Ufuk, Hi Stephan,
> >> >
> >> > sorry for the late response Attached the client logs.
> >> >
> >> > Cheers,
> >> >
> >> > Konstantin
> >> >
> >> > On 04.04.2016 21 :20, Stephan Ewen wrote:
> >> >> This seems to the the critical part in the logs:
> >> >>
> >> >> 2016-03-31 09:01:52,234 INFO
> org.apache.flink.yarn.YarnJobManager
> >> >>- Re-submitting 0 job graphs.
> >> >> 2016-03-31 09:02:51,182 INFO
> org.apache.flink.yarn.YarnJobManager
> >> >>- Stopping YARN JobManager with status FAILED
> and
> >> >> diagnostic Flink YARN Client requested shutdown.
> >> >>
> >> >> The YarnJobManager starts up properly, but the Client never sends
> >> >> anything, shuts down at some point, and tears down the YARN
> cluster.
> >> >>
> >> >> Client logs would help a lot there...
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> On Sat, Apr 2, 2016 at 12:43 PM, Ufuk Celebi  >> 
> >> >> >> wrote:
> >> >>
> >> >> Hey Konstantin,
> >> >>
> >> >> That's weird. Can you please log the client output on DEBUG
> >> level and
> >> >> provide that as well? I'm wondering whether the client uses a
> >> >> different root path.
> >> >>
> >> >> The following seems to happen:
> >> >> - you use ledf_recovery as the root namespace
> >> >> - the task managers are connecting (they resolve the JM
> >> address via
> >> >> ZooKeeper in this case as well, which means they correctly
> >> use the
> >> >> same namespace)
> >> >> - but the client, which started the YARN session, does not
> >> ever submit
> >> >> the job to the cluster.
> >> >>
> >> >> – Ufuk
> >> >>
> >> >> On Thu, Mar 31, 2016 at 9:23 AM, Konstantin Knauf
> >> >>  >> 
> >>  >> >>
> >> >> wrote:
> >> >> > Hi everyone,
> >> >> >
> >> >> > we are running in some problems with multiple per-job yarn
> >> >> sessions, too.
> >> >> >
> >> >> > When we are are starting a per-job yarn session (Flink 1.0,
> >> Hadoop
> >> >> 2.4)
> >> >> > with recovery.zookeeper.path.root other than /flink, the
> >> yarn session
> >> >> > starts but no job is submitted, and after 1 min or so the
> >> session
> >> >> > crashes. 

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

2016-04-07 Thread Till Rohrmann
Hi Norman,

this error is exactly what I thought I had fixed. I guess there is still
another case where a premature pruning can happen in the SharedBuffer.
Could you maybe send me the example code with which you could produce the
error. The input data would also be very helpful. Then I can debug it.
Thanks :-)

Cheers,
Till

On Thu, Apr 7, 2016 at 7:50 AM, norman sp 
wrote:

> Hi Till,
> I used Flink version 1.0.0 and tried all three TimeCharacteristics.
> Not I tried the new Flink 1.0.1 that gives me the following error.
> After detecting an event it processes a few stream tuples but then crashes.
> I'm not sure how to solve that part of the error message: "This can
> indicate
> that the element belonging to the previous relation has been already
> pruned,
> even though you expect it to be still there"
>
> 4>   ###   Warning! FlowPatternEvent:   ###
> 4> (7605e43c-ca20-4524-af5f-23fd8e55d7b9,Intensity - Value,25.0,3.0,75.0)
> 4> (5025ef64-2b83-4112-b9bd-2d7de46454c9,Pressure - Value,7.0,3.0,21.0)
> 4> (3d501c5b-b0e2-41e6-bfdc-14c996df6e19,Flow - Value,27.0,3.0,81.0)
> 04/06/2016 16:41:31 Map -> Sink: Unnamed(4/4) switched to CANCELED
> 04/06/2016 16:41:31 AbstractCEPPatternOperator(1/1) switched to FAILED
> java.lang.RuntimeException: Failure happened in filter function.
> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:292)
> at org.apache.flink.cep.nfa.NFA.process(NFA.java:136)
> at
>
> org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
> at
>
> org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
> at
>
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
> at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Could not find previous shared
> buffer entry with key: State(start, Normal, [
> StateTransition(TAKE, FlowOver10, with filter),
> StateTransition(IGNORE, start),
> ]), value: (83788338-b26c-4538-a437-77d1bbf7b348,Pressure -
> Value,7.0,5.0,35.0) and timestamp: 1459953675999. This can indicate that
> the
> element belonging to the previous relation has been already pruned, even
> though you expect it to be still there.
> at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:104)
> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:269)
> ... 8 more
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5976.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: RocksDB state checkpointing is expensive?

2016-04-07 Thread Aljoscha Krettek
Hi,
you are right. Currently there is no incremental checkpointing and
therefore, at each checkpoint, we essentially copy the whole RocksDB
database to HDFS (or whatever filesystem you chose as a backup location).
As far as I know, Stephan will start working on adding support for
incremental snapshots this week or next week.

Cheers,
Aljoscha

On Thu, 7 Apr 2016 at 09:55 Krzysztof Zarzycki  wrote:

> Hi,
> I saw the documentation and source code of the state management with
> RocksDB and before I use it, I'm concerned of one thing: Am I right that
> currently when state is being checkpointed, the whole RocksDB state is
> snapshotted? There is no incremental, diff snapshotting, is it? If so, this
> seems to be unfeasible for keeping state counted in tens or hundreds of GBs
> (and you reach that size of a state, when you want to keep an embedded
> state of the streaming application instead of going out to Cassandra/Hbase
> or other DB). It will just cost too much to do snapshots of such large
> state.
>
> Samza as a good example to compare, writes every state change to Kafka
> topic, considering it a snapshot in the shape of changelog. Of course in
> the moment of app restart, recovering the state from the changelog would be
> too costly, that is why the changelog topic is compacted. Plus, I think
> Samza does a state snapshot from time to time anyway (but I'm not sure of
> that).
>
> Thanks for answering my doubts,
> Krzysztof
>
>


Re: Handling large state (incremental snapshot?)

2016-04-07 Thread Aljoscha Krettek
Ah yes, you're right. With the non-keyed stream it doesn't make a big
difference because it's only one big state value.

The throughput still seems quite low. Have you ever tried looking at the
"back pressure" tab on the Flink dashboard. For this I would suggest to
disable chaining, so that every operator is run in an isolated task:

env.disableOperatorChaining();

On Thu, 7 Apr 2016 at 05:11 Hironori Ogibayashi 
wrote:

> I tried RocksDB, but the result was almost the same.
>
> I used the following code and put 2.6million distinct records into Kafka.
> After processing all records, the state on the HDFS become about 250MB
> and time needed for
> the checkpoint was almost 5sec. Processing throughput was
> FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec
>
> ---
> env.setStateBackend(new
> RocksDBStateBackend("hdfs://:8020/apps/flink/checkpoints"));
>
> val stream = env
>   .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new
> SimpleStringSchema(), properties))
>   .map(parseJson(_))
>   .timeWindowAll(Time.of(10, TimeUnit.DAYS))
>   .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>   // count distinct values
>   .fold(Set[String]()){(r,i) => { r + i}}
>   .map{x => (System.currentTimeMillis(), x.size)}
>   .addSink(new ElasticsearchSink(config, transports, new
> IndexRequestBuilder[Tuple2[Long, Int]]  {
> override def createIndexRequest(element: Tuple2[Long, Int],
> ctx: RuntimeContext): IndexRequest = {
>   val json = new HashMap[String, AnyRef]
>   json.put("@timestamp", new Timestamp(element._1))
>   json.put("count", element._2: java.lang.Integer)
>
> Requests.indexRequest.index("dummy3").`type`("my-type").source(json)
> }
>   }))
> ---
>
> I guess this is because I used non-keyed stream, so I had one state
> record with a big value (all distinct value).
> I think copying all 250MB(or more) file to HDFS in every checkpoint
> will be heavy, so I will try storing the distinct values
> in the external datastore (e.g. redis).
> Also, when incremental snapshot get implemented, I want to try.
>
> Regards,
> Hironori
>
> 2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi :
> > Aljoscha,
> >
> > Thank you for your quick response.
> > Yes, I am using FsStateBackend, so I will try RocksDB backend.
> >
> > Regards,
> > Hironori
> >
> > 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek :
> >> Hi,
> >> I guess you are using the FsStateBackend, is that correct? You could try
> >> using the RocksDB state backend:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend
> >>
> >> With this, throughput will be lower but the overhead per checkpoint
> could be
> >> lower. Also, with this most of the file copying necessary for the
> checkpoint
> >> will be done while data processing keeps running (asynchronous
> snapshot).
> >>
> >> As to incremental snapshots. I'm afraid this feature is not yet
> implemented
> >> but we're working on it.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi 
> >> wrote:
> >>>
> >>> Hello,
> >>>
> >>> I am trying to implement windowed distinct count on a stream. In this
> >>> case, the state
> >>> have to hold all distinct value in the window, so can be large.
> >>>
> >>> In my test, if the state size become about 400MB, checkpointing takes
> >>> 40sec and spends most of Taskmanager's CPU.
> >>> Are there any good way to handle this situation?
> >>>
> >>> Flink document mentions about incremental snapshot, and I am
> interested in
> >>> it,
> >>> but could not find how to enable it. (not implemented yet?)
> >>>
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
> >>>
> >>> Regards,
> >>> Hironori
>


Re: YARN High Availability

2016-04-07 Thread Konstantin Knauf
Hi everyone,

thanks to Robert, I found the problem.

I was setting "recovery.zookeeper.path.root" on the command line with
-yD. Apparently this is currently not supported. You need to set it the
parameter in flink-conf.yaml.

Cheers,

Konstantin

On 05.04.2016 12:52, Konstantin Knauf wrote:
> Hi Robert,
> 
> I tried several paths and rmr before.
> 
> It stopped after 1-2 minutes. There was an exception on the shell.
> Sorry, should have attached to the last mail.
> 
> Thanks,
> 
> Konstnatin
> 
> On 05.04.2016 11:22, Robert Metzger wrote:
>> I've tried reproducing the issue on a test cluster, but everything
>> worked fine.
>>
>> Have you tried different values for "recovery.zookeeper.path.root" or
>> only one? Maybe the path you've put contains invalid data?
>>
>> Regarding the client log you've send: Did you manually stop the client
>> or did it stop after a few minutes?
>> The JobManager stops after a few minutes because the client requested a
>> shutdown. Usually, the client only shuts down on an exception or when
>> the user stops the yarn session.
>> There is no exception in the client log. Was there an exception printed
>> to the shell? 
>>
>> This log message: 
>>
>> 2016-04-05 08:48:34,912 DEBUG org.apache.flink.yarn.FlinkYarnCluster
>>- Received message option None
>>
>> Should not be an issue.
>>
>>
>> On Tue, Apr 5, 2016 at 10:14 AM, Ufuk Celebi > > wrote:
>>
>> Hey Konstantin,
>>
>> just looked at the logs and the cluster is started, but the job is
>> indeed never submitted.
>>
>> I've forwarded this to Robert, because he is familiar with the YARN
>> client. I will look into how the client interacts with the ZooKeeper
>> root path.
>>
>> – Ufuk
>>
>>
>> On Tue, Apr 5, 2016 at 9:18 AM, Konstantin Knauf
>> >
>> wrote:
>> > Hi Ufuk, Hi Stephan,
>> >
>> > sorry for the late response Attached the client logs.
>> >
>> > Cheers,
>> >
>> > Konstantin
>> >
>> > On 04.04.2016 21 :20, Stephan Ewen wrote:
>> >> This seems to the the critical part in the logs:
>> >>
>> >> 2016-03-31 09:01:52,234 INFO  org.apache.flink.yarn.YarnJobManager
>> >>- Re-submitting 0 job graphs.
>> >> 2016-03-31 09:02:51,182 INFO  org.apache.flink.yarn.YarnJobManager
>> >>- Stopping YARN JobManager with status FAILED and
>> >> diagnostic Flink YARN Client requested shutdown.
>> >>
>> >> The YarnJobManager starts up properly, but the Client never sends
>> >> anything, shuts down at some point, and tears down the YARN cluster.
>> >>
>> >> Client logs would help a lot there...
>> >>
>> >>
>> >>
>> >>
>> >> On Sat, Apr 2, 2016 at 12:43 PM, Ufuk Celebi > 
>> >> >> wrote:
>> >>
>> >> Hey Konstantin,
>> >>
>> >> That's weird. Can you please log the client output on DEBUG
>> level and
>> >> provide that as well? I'm wondering whether the client uses a
>> >> different root path.
>> >>
>> >> The following seems to happen:
>> >> - you use ledf_recovery as the root namespace
>> >> - the task managers are connecting (they resolve the JM
>> address via
>> >> ZooKeeper in this case as well, which means they correctly
>> use the
>> >> same namespace)
>> >> - but the client, which started the YARN session, does not
>> ever submit
>> >> the job to the cluster.
>> >>
>> >> – Ufuk
>> >>
>> >> On Thu, Mar 31, 2016 at 9:23 AM, Konstantin Knauf
>> >> > 
>> > >>
>> >> wrote:
>> >> > Hi everyone,
>> >> >
>> >> > we are running in some problems with multiple per-job yarn
>> >> sessions, too.
>> >> >
>> >> > When we are are starting a per-job yarn session (Flink 1.0,
>> Hadoop
>> >> 2.4)
>> >> > with recovery.zookeeper.path.root other than /flink, the
>> yarn session
>> >> > starts but no job is submitted, and after 1 min or so the
>> session
>> >> > crashes. I attached the jobmanager log.
>> >> >
>> >> > In Zookeeper the root-directory is created and child-nodes
>> >> >
>> >> > leaderlatch
>> >> > jobgraphs
>> >> >
>> >> > /flink does also exist, but does not have child nodes.
>> >> >
>> >> > Everything runs fine, with the default
>> recovery.zookeeper.root.path.
>> >> >
>> >> > Does anyone have an idea, what is going on?
>> >> >
>>  

Re: Accessing RDF triples using Flink

2016-04-07 Thread Flavio Pompermaier
Hi Ritesh,
Jena could store triples in NQuadsInputFormat that is an HadoopInputFormat
so that you can read data in effiient way with Flink. Unfortunately I
rembember that I had some problem usign it so I just export my Jena model
as NQuads so then I can parse it efficiently with Flink as a text file.
However the parsing with sesame 4 is more efficient in terms of speed and
garbage collection.

What I do is to convert every quad into a tuple5, group triples/quads by
subject and then apply some logic. The quads grouped by subject is what we
call "entiton atom" and combining them leads to an "entiton molecule" (i.e.
a graph rooted in some entiton atom).

We presented our work at FlinkForward 2015 in Berlin:
http://www.slideshare.net/FlinkForward/s-bartoli-f-popmermaier-a-semantic-big-data-companion
If you need some code that reads the nquads with Flink I can give you some
code, just write me in private!

Best,
Flavio

On Wed, Apr 6, 2016 at 3:57 PM, Ritesh Kumar Singh <
riteshoneinamill...@gmail.com> wrote:

> Hi Flavio,
>
>1. How do you access your rdf dataset via flink? Are you reading it as
>a normal input file and splitting the records or you have some wrappers in
>place to convert the rdf data into triples? Can you please share some code
>samples if possible?
>2. I am using Jena TDB command line utilities to make queries against
>the dataset in order to avoid java garbage collection issues. I am also
>using Jena java APIs as a dependency but command line utils are way faster
>(Though it comes with an extra requirement to have Jena command line utils
>installed in the system). Main reason for this approach being able to pass
>the string output from the command line to Flink as part of my pipeline.
>Can you tell me your approach to this?
>3. Should I dump my query output to a file and then consume it as a
>normal input source for Flink?
>
>
> Basically, any help regarding this will be helpful.
>
> Regards,
> Ritesh
>
>
>
> Ritesh Kumar Singh
> [image: https://]about.me/riteshoneinamillion
>
> 
>
> On Wed, Apr 6, 2016 at 2:45 PM, Flavio Pompermaier 
> wrote:
>
>> Ho Ritesh,
>> I have sone experience with Rdf and Flink. What do you mean for accessing
>> a Jena model? How do you create it?
>>
>> From my experience reading triples from jena models is evil because it
>> has some problems with garbage collection.
>> On 6 Apr 2016 00:51, "Ritesh Kumar Singh" 
>> wrote:
>>
>>> Hi,
>>>
>>> I need some suggestions regarding accessing RDF triples from flink. I'm
>>> trying to integrate flink in a pipeline where the input for flink comes
>>> from SPARQL query on a Jena model. And after modification of triples using
>>> flink, I will be performing SPARQL update using Jena to save my changes.
>>>
>>>- Are there any recommended input format for loading the triples to
>>>flink?
>>>- Will this use case be classified as a flink streaming job or a
>>>batch processing job?
>>>- How will loading of the dataset vary with the input size?
>>>- Are there any recommended packages/ projects for these type of
>>>projects?
>>>
>>> Any suggestion will be of great help.
>>>
>>> Regards,
>>> Ritesh
>>> https://riteshtoday.wordpress.com/
>>>
>>
>


Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Timur Fayruzov
there is a hack for this issue: copying my native library to
$HADOOP_HOME/lib/native makes it discoverable and a program runs, however
this is not an appropriate solution and it seems to be fragile.

I tried to find where 'lib/native' path appears in the configuration and
found 2 places:
hadoop-env.sh: export
JAVA_LIBRARY_PATH="$JAVA_LIBRARY_PATH:/usr/lib/hadoop-lzo/lib/native
mapred-site.xml: key: mapreduce.admin.user.env

I tried to add path to dir with my native lib in both places, but still no
luck.

Thanks,
Timur

On Wed, Apr 6, 2016 at 11:21 PM, Timur Fayruzov 
wrote:

> Hello,
>
> I'm not sure whether it's a Hadoop or Flink-specific question, but since I
> ran into this in the context of Flink I'm asking here. I would be glad if
> anyone can suggest a more appropriate place.
>
> I have a native library that I need to use in my Flink batch job that I
> run on EMR, and I try to point JVM to the location of native library.
> Normally, I'd do this using java.library.path parameter. So I try to run as
> follows:
> `
> HADOOP_CONF_DIR=/etc/hadoop/conf
> JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m
> yarn-cluster -yn 1 -yjm 768 -ytm 768 
> `
> It does not work, fails with `java.lang.UnsatisfiedLinkError` when trying
> to load the native lib. It probably has to do with YARN not not passing
> this parameter to task nodes, but my understanding of this mechanism is
> quite limited so far.
>
> I dug up this Jira ticket:
> https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting
> LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem
> either.
>
> Any help or hint where to look is highly appreciated.
>
> Thanks,
> Timur
>


Flink event processing immediate feedback

2016-04-07 Thread igor.berman
Hi,
Suppose I have web facing frontend that gets stream of events(http calls). I
need to process event stream and do some aggregations over those events and
write aggregated statistics to Hbase - so far Flink seems as perfect match.
However in some cases event should trigger some alert and frontend needs to
get this alert in synchronous way - here I'm a bit lost. I thought about
some kind of following flow:
frontend -> queue -> flink -> redis(pub/sub)<- frontend

I.e. I have two major use cases - async aggregated analytics/stats computing
and "synchronous" response to frontend. Frontend might be node/play or any
other technology that won't have a problem of "waiting" for the response, so
the only question - how to implement this feedback ?
Might be some kind of Sink?

Any ideas would be appreciated,
Igor




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-event-processing-immediate-feedback-tp5978.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


RocksDB state checkpointing is expensive?

2016-04-07 Thread Krzysztof Zarzycki
Hi,
I saw the documentation and source code of the state management with
RocksDB and before I use it, I'm concerned of one thing: Am I right that
currently when state is being checkpointed, the whole RocksDB state is
snapshotted? There is no incremental, diff snapshotting, is it? If so, this
seems to be unfeasible for keeping state counted in tens or hundreds of GBs
(and you reach that size of a state, when you want to keep an embedded
state of the streaming application instead of going out to Cassandra/Hbase
or other DB). It will just cost too much to do snapshots of such large
state.

Samza as a good example to compare, writes every state change to Kafka
topic, considering it a snapshot in the shape of changelog. Of course in
the moment of app restart, recovering the state from the changelog would be
too costly, that is why the changelog topic is compacted. Plus, I think
Samza does a state snapshot from time to time anyway (but I'm not sure of
that).

Thanks for answering my doubts,
Krzysztof


Re: Flink CEP AbstractCEPPatternOperator fail after event detection

2016-04-07 Thread norman sp
Hi Till,
I used Flink version 1.0.0 and tried all three TimeCharacteristics. 
Not I tried the new Flink 1.0.1 that gives me the following error.
After detecting an event it processes a few stream tuples but then crashes.
I'm not sure how to solve that part of the error message: "This can indicate
that the element belonging to the previous relation has been already pruned,
even though you expect it to be still there"

4>   ###   Warning! FlowPatternEvent:   ### 
4> (7605e43c-ca20-4524-af5f-23fd8e55d7b9,Intensity - Value,25.0,3.0,75.0)
4> (5025ef64-2b83-4112-b9bd-2d7de46454c9,Pressure - Value,7.0,3.0,21.0)
4> (3d501c5b-b0e2-41e6-bfdc-14c996df6e19,Flow - Value,27.0,3.0,81.0)
04/06/2016 16:41:31 Map -> Sink: Unnamed(4/4) switched to CANCELED 
04/06/2016 16:41:31 AbstractCEPPatternOperator(1/1) switched to FAILED 
java.lang.RuntimeException: Failure happened in filter function.
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:292)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:136)
at
org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
at
org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Could not find previous shared
buffer entry with key: State(start, Normal, [
StateTransition(TAKE, FlowOver10, with filter),
StateTransition(IGNORE, start),
]), value: (83788338-b26c-4538-a437-77d1bbf7b348,Pressure -
Value,7.0,5.0,35.0) and timestamp: 1459953675999. This can indicate that the
element belonging to the previous relation has been already pruned, even
though you expect it to be still there.
at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:104)
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:269)
... 8 more




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5976.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Using native libraries in Flink EMR jobs

2016-04-07 Thread Timur Fayruzov
Hello,

I'm not sure whether it's a Hadoop or Flink-specific question, but since I
ran into this in the context of Flink I'm asking here. I would be glad if
anyone can suggest a more appropriate place.

I have a native library that I need to use in my Flink batch job that I run
on EMR, and I try to point JVM to the location of native library. Normally,
I'd do this using java.library.path parameter. So I try to run as follows:
`
HADOOP_CONF_DIR=/etc/hadoop/conf
JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m
yarn-cluster -yn 1 -yjm 768 -ytm 768 
`
It does not work, fails with `java.lang.UnsatisfiedLinkError` when trying
to load the native lib. It probably has to do with YARN not not passing
this parameter to task nodes, but my understanding of this mechanism is
quite limited so far.

I dug up this Jira ticket:
https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting
LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem
either.

Any help or hint where to look is highly appreciated.

Thanks,
Timur