Re: Flink Streaming ContinuousTimeTriggers

2016-02-24 Thread Gavin Lin
Hi,

how about this video ?
https://www.youtube.com/watch?v=T7hiwcwCXGI

Gavin.Lin

2016-02-25 3:55 GMT+08:00 Ankur Sharma :

> Hey,
>
> Can you guide me to some example of ContinuousProcessingTimeTrigger?
> I want to partition input stream into TimeWindow that should fire at
> continuous time interval on its on without waiting for a new element to
> enter the stream.
> Could you guide me to it?
>
>
> Thanks
>
> Best,
> *Ankur Sharma*
> *Information Systems Group*
> *3.15 E1.1 Universität des Saarlandes*
> *66123, Saarbrücken Germany*
> *Email: ankur.sha...@mpi-inf.mpg.de  *
> *an...@stud.uni-saarland.de *
>


Re: How to use all available task managers

2016-02-24 Thread Matthias J. Sax
Could it be, that you would need to edit client local flink-conf.yaml
instead of the TaskManager config files? (In case, you do not want to
specify parallelism via env.setParallelism(int);)

-Matthias

On 02/24/2016 04:19 PM, Saiph Kappa wrote:
> Thanks! It worked now :-)
> 
> On Wed, Feb 24, 2016 at 2:48 PM, Ufuk Celebi  > wrote:
> 
> You can use the environment to set it the job parallelism to 6 e.g.
> env.setParallelism(6).
> 
> Setting this will override the default behaviour. Maybe that's why the
> default parallelism is not working... you might have it set to 1
> already?
> 
> On Wed, Feb 24, 2016 at 3:41 PM, Saiph Kappa  > wrote:
> > I set "parallelism.default: 6" on flink-conf.yaml of all 6
> machines, and
> > still, my job only uses 1 task manager. Why?
> >
> >
> > On Wed, Feb 24, 2016 at 8:31 AM, Till Rohrmann
> > wrote:
> >>
> >> Hi Saiph,
> >>
> >> I think the configuration value should be parallelism.default: 6.
> That
> >> will execute jobs which have not parallelism defined with a DOP of 6.
> >>
> >> Cheers,
> >> Till
> >>
> >>
> >> On Wed, Feb 24, 2016 at 1:43 AM, Saiph Kappa
> >
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I  am running a flink stream application on a cluster with 6
> slaves/task
> >>> managers. I have set in flink-conf.yaml of every machine
> >>> "parallelization.degree.default: 6". However, when I run my
> application it
> >>> just uses one task slot and not all of them. Am I missing something?
> >>>
> >>> Thanks.
> >>
> >>
> >
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: downloading dependency in apache flink

2016-02-24 Thread Pankaj Kumar
I was using wrong method to create fat jar,

after using  mvn clean install -Pbuild-jar i was able to execute code
through flink command line.

On Wed, Feb 24, 2016 at 7:12 PM, Till Rohrmann  wrote:

> What is the error message you receive?
>
> On Wed, Feb 24, 2016 at 1:49 PM, Pankaj Kumar 
> wrote:
>
>> Hi Till ,
>>
>> I was able to make fat jar, but i am not able to execute this jar through
>> flink command line.
>>
>> On Wed, Feb 24, 2016 at 4:31 PM, Till Rohrmann 
>> wrote:
>>
>>> Hi Pankaj,
>>>
>>> are you creating a fat jar when you create your use code jar? This can
>>> be done using maven's shade plugin or the assembly plugin. We provide a
>>> maven archetype to set up a pom file which will make sure that a fat jar is
>>> built [1].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Feb 24, 2016 at 11:31 AM, Pankaj Kumar 
>>> wrote:
>>>
 i am trying to write a job, using maven project.

 Job is working fine in my editor , but when i am running that job
 through flink command line its giving ClassNotFoundException exception
 . Its not to find dependency.

 If i will create a jar , will flink download all its dependency before
 executing that jar. if not that how can we run one-jar.


 Thanks

>>>
>>>
>>
>


Flink Streaming ContinuousTimeTriggers

2016-02-24 Thread Ankur Sharma
Hey,

Can you guide me to some example of ContinuousProcessingTimeTrigger?
I want to partition input stream into TimeWindow that should fire at continuous 
time interval on its on without waiting for a new element to enter the stream.
Could you guide me to it?


Thanks

Best,
Ankur Sharma
Information Systems Group
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: ankur.sha...@mpi-inf.mpg.de  
an...@stud.uni-saarland.de 

smime.p7s
Description: S/MIME cryptographic signature


Flink Streaming - WriteAsText

2016-02-24 Thread Ankur Sharma
Hey,

I am trying to use ContinuousProcessingTimeTrigger which fires TimeWindow every 
5 seconds. But even though I explicitly state that the output of apply() method 
should be dumped to file every 10 seconds, I don’t see the file getting 
appended. When I cancel the job, I see all the dumps which are not visible 
earlier.

I also tried writing the result to socket but it also has same effect.

Please help.

Best,
Ankur Sharma
Information Systems Group
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: ankur.sha...@mpi-inf.mpg.de  
an...@stud.uni-saarland.de 

smime.p7s
Description: S/MIME cryptographic signature


Re: Compile issues with Flink 1.0-SNAPSHOT and Scala 2.11

2016-02-24 Thread Till Rohrmann
I just tested building a Flink job using the latest SNAPSHOT version and
the flink-connector-kafka-0.8/flink-connector-kafka-0.9 Kafka connector.
The compilation succeeded with SBT.

Could you maybe share your build.sbt with me. This would help me to figure
out the problem you’re experiencing.

Cheers,
Till
​

On Wed, Feb 24, 2016 at 6:37 PM, Cory Monty 
wrote:

> What Dan posted on 2/22 is the current error we're seeing. As he stated,
> using the 1.0.0-rc0 version works, but switching back to SNAPSHOT does not
> compile. We can try clearing the ivy cache, but that has had no affect in
> the past.
>
> On Wed, Feb 24, 2016 at 11:34 AM, Till Rohrmann 
> wrote:
>
>> What is currently the error you observe? It might help to clear
>> org.apache.flink in the ivy cache once in a while.
>>
>> Cheers,
>> Till
>>
>> On Wed, Feb 24, 2016 at 6:09 PM, Cory Monty 
>> wrote:
>>
>>> We're still seeing this issue in the latest SNAPSHOT version. Do you
>>> have any suggestions to resolve the error?
>>>
>>> On Mon, Feb 22, 2016 at 3:41 PM, Dan Kee  wrote:
>>>
 Hello,

 I'm not sure if this related, but we recently started seeing this when
 using `1.0-SNAPSHOT` in the `snapshots` repository:

 [error] Modules were resolved with conflicting cross-version suffixes in 
 {file:/home/ubuntu/bt/}flinkproject:
 [error]org.apache.kafka:kafka _2.10, _2.11
 java.lang.RuntimeException: Conflicting cross-version suffixes in: 
 org.apache.kafka:kafka
at scala.sys.package$.error(package.scala:27)
at sbt.ConflictWarning$.processCrossVersioned(ConflictWarning.scala:46)
at sbt.ConflictWarning$.apply(ConflictWarning.scala:32)
at sbt.Classpaths$$anonfun$66.apply(Defaults.scala:1164)
at sbt.Classpaths$$anonfun$66.apply(Defaults.scala:1161)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
at sbt.std.Transform$$anon$4.work(System.scala:63)
at 
 sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
at 
 sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.Execute.work(Execute.scala:235)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
at 
 sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



 We switched our project to use `1.0.0` in the `orgapacheflink-1062`
 repository and that works.  Just wanted to let you know about the error we
 seeing with the snapshot version.

 Thanks!

 —Dan

 On Fri, Feb 12, 2016 at 8:41 AM, Cory Monty <
 cory.mo...@getbraintree.com> wrote:

> Thanks, Stephan.
>
> Everything is back to normal for us.
>
> Cheers,
>
> Cory
>
> On Fri, Feb 12, 2016 at 6:54 AM, Stephan Ewen 
> wrote:
>
>> Hi Cory!
>>
>> We found the problem. There is a development fork of Flink for Stream
>> SQL, whose CI infrastructure accidentally also deployed snapshots and
>> overwrote some of the proper master branch snapshots.
>>
>> That's why the snapshots got inconsistent. We fixed that, and newer
>> snapshots should be online.
>> Hope that this is resolved now.
>>
>> Sorry for the inconvenience,
>> Stephan
>>
>>
>> On Fri, Feb 12, 2016 at 12:51 AM, Stephan Ewen 
>> wrote:
>>
>>> Hi!
>>>
>>> The CI system has just finished uploading an new snapshot. In that
>>> one, the scalatest dependency is now correctly at 2.11 again.
>>>
>>>
>>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.232156-288.pom
>>>
>>> I am very puzzled, we did not touch any parts that seem to affect
>>> this. I am wondering if it is possible that Maven had a hiccup...
>>>
>>> Can you retry (force dependency update), see if the dependencies are
>>> correct again?
>>>
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On 

Re: Compile issues with Flink 1.0-SNAPSHOT and Scala 2.11

2016-02-24 Thread Cory Monty
What Dan posted on 2/22 is the current error we're seeing. As he stated,
using the 1.0.0-rc0 version works, but switching back to SNAPSHOT does not
compile. We can try clearing the ivy cache, but that has had no affect in
the past.

On Wed, Feb 24, 2016 at 11:34 AM, Till Rohrmann 
wrote:

> What is currently the error you observe? It might help to clear
> org.apache.flink in the ivy cache once in a while.
>
> Cheers,
> Till
>
> On Wed, Feb 24, 2016 at 6:09 PM, Cory Monty 
> wrote:
>
>> We're still seeing this issue in the latest SNAPSHOT version. Do you have
>> any suggestions to resolve the error?
>>
>> On Mon, Feb 22, 2016 at 3:41 PM, Dan Kee  wrote:
>>
>>> Hello,
>>>
>>> I'm not sure if this related, but we recently started seeing this when
>>> using `1.0-SNAPSHOT` in the `snapshots` repository:
>>>
>>> [error] Modules were resolved with conflicting cross-version suffixes in 
>>> {file:/home/ubuntu/bt/}flinkproject:
>>> [error]org.apache.kafka:kafka _2.10, _2.11
>>> java.lang.RuntimeException: Conflicting cross-version suffixes in: 
>>> org.apache.kafka:kafka
>>> at scala.sys.package$.error(package.scala:27)
>>> at sbt.ConflictWarning$.processCrossVersioned(ConflictWarning.scala:46)
>>> at sbt.ConflictWarning$.apply(ConflictWarning.scala:32)
>>> at sbt.Classpaths$$anonfun$66.apply(Defaults.scala:1164)
>>> at sbt.Classpaths$$anonfun$66.apply(Defaults.scala:1161)
>>> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>>> at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
>>> at sbt.std.Transform$$anon$4.work(System.scala:63)
>>> at 
>>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
>>> at 
>>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
>>> at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
>>> at sbt.Execute.work(Execute.scala:235)
>>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
>>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
>>> at 
>>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
>>> at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> We switched our project to use `1.0.0` in the `orgapacheflink-1062`
>>> repository and that works.  Just wanted to let you know about the error we
>>> seeing with the snapshot version.
>>>
>>> Thanks!
>>>
>>> —Dan
>>>
>>> On Fri, Feb 12, 2016 at 8:41 AM, Cory Monty >> > wrote:
>>>
 Thanks, Stephan.

 Everything is back to normal for us.

 Cheers,

 Cory

 On Fri, Feb 12, 2016 at 6:54 AM, Stephan Ewen  wrote:

> Hi Cory!
>
> We found the problem. There is a development fork of Flink for Stream
> SQL, whose CI infrastructure accidentally also deployed snapshots and
> overwrote some of the proper master branch snapshots.
>
> That's why the snapshots got inconsistent. We fixed that, and newer
> snapshots should be online.
> Hope that this is resolved now.
>
> Sorry for the inconvenience,
> Stephan
>
>
> On Fri, Feb 12, 2016 at 12:51 AM, Stephan Ewen 
> wrote:
>
>> Hi!
>>
>> The CI system has just finished uploading an new snapshot. In that
>> one, the scalatest dependency is now correctly at 2.11 again.
>>
>>
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.232156-288.pom
>>
>> I am very puzzled, we did not touch any parts that seem to affect
>> this. I am wondering if it is possible that Maven had a hiccup...
>>
>> Can you retry (force dependency update), see if the dependencies are
>> correct again?
>>
>>
>> Greetings,
>> Stephan
>>
>>
>> On Fri, Feb 12, 2016 at 12:23 AM, Stephan Ewen 
>> wrote:
>>
>>> Hi!
>>>
>>> I examined the Apache Snapshot Repository, and I could see that in
>>> the latest snapshot a "scalatest_2.10" version was introduced. I could 
>>> not
>>> figure out how, yet. I could not find a "flink-core_2.10" or
>>> "flink-annotations_2.10" dependency, yet.
>>>
>>>
>>> Previous snapshot:
>>> 

Re: Compile issues with Flink 1.0-SNAPSHOT and Scala 2.11

2016-02-24 Thread Cory Monty
We're still seeing this issue in the latest SNAPSHOT version. Do you have
any suggestions to resolve the error?

On Mon, Feb 22, 2016 at 3:41 PM, Dan Kee  wrote:

> Hello,
>
> I'm not sure if this related, but we recently started seeing this when
> using `1.0-SNAPSHOT` in the `snapshots` repository:
>
> [error] Modules were resolved with conflicting cross-version suffixes in 
> {file:/home/ubuntu/bt/}flinkproject:
> [error]org.apache.kafka:kafka _2.10, _2.11
> java.lang.RuntimeException: Conflicting cross-version suffixes in: 
> org.apache.kafka:kafka
>   at scala.sys.package$.error(package.scala:27)
>   at sbt.ConflictWarning$.processCrossVersioned(ConflictWarning.scala:46)
>   at sbt.ConflictWarning$.apply(ConflictWarning.scala:32)
>   at sbt.Classpaths$$anonfun$66.apply(Defaults.scala:1164)
>   at sbt.Classpaths$$anonfun$66.apply(Defaults.scala:1161)
>   at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>   at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
>   at sbt.std.Transform$$anon$4.work(System.scala:63)
>   at 
> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
>   at 
> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
>   at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
>   at sbt.Execute.work(Execute.scala:235)
>   at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
>   at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
>   at 
> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
>   at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
> We switched our project to use `1.0.0` in the `orgapacheflink-1062`
> repository and that works.  Just wanted to let you know about the error we
> seeing with the snapshot version.
>
> Thanks!
>
> —Dan
>
> On Fri, Feb 12, 2016 at 8:41 AM, Cory Monty 
> wrote:
>
>> Thanks, Stephan.
>>
>> Everything is back to normal for us.
>>
>> Cheers,
>>
>> Cory
>>
>> On Fri, Feb 12, 2016 at 6:54 AM, Stephan Ewen  wrote:
>>
>>> Hi Cory!
>>>
>>> We found the problem. There is a development fork of Flink for Stream
>>> SQL, whose CI infrastructure accidentally also deployed snapshots and
>>> overwrote some of the proper master branch snapshots.
>>>
>>> That's why the snapshots got inconsistent. We fixed that, and newer
>>> snapshots should be online.
>>> Hope that this is resolved now.
>>>
>>> Sorry for the inconvenience,
>>> Stephan
>>>
>>>
>>> On Fri, Feb 12, 2016 at 12:51 AM, Stephan Ewen  wrote:
>>>
 Hi!

 The CI system has just finished uploading an new snapshot. In that one,
 the scalatest dependency is now correctly at 2.11 again.


 https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.232156-288.pom

 I am very puzzled, we did not touch any parts that seem to affect this.
 I am wondering if it is possible that Maven had a hiccup...

 Can you retry (force dependency update), see if the dependencies are
 correct again?


 Greetings,
 Stephan


 On Fri, Feb 12, 2016 at 12:23 AM, Stephan Ewen 
 wrote:

> Hi!
>
> I examined the Apache Snapshot Repository, and I could see that in the
> latest snapshot a "scalatest_2.10" version was introduced. I could not
> figure out how, yet. I could not find a "flink-core_2.10" or
> "flink-annotations_2.10" dependency, yet.
>
>
> Previous snapshot:
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.162913-286.pom
>
> Latest Snapshot:
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.201205-287.pom
>
>
> We'll try and fix this ASAP. Sorry for that, this is quite a mystery
> right now...
>
> Best,
> Stephan
>
> On Thu, Feb 11, 2016 at 11:56 PM, Cory Monty <
> cory.mo...@getbraintree.com> wrote:
>
>> Ufuk,
>>
>> Thanks for the link. I've double-checked everything in our
>> dependencies list and it's all correct.
>>
>> Stephan,
>>
>> We don't explicitly depend on 

Re: Best way to process data in many files? (FLINK-BATCH)

2016-02-24 Thread Tim Conrad

Dear Till and others.

I solved the issue by using the strategy suggested by Till like this:

List fileListOfSpectra = ...
SplittableList fileListOfSpectraSplitable = new 
SplittableList( fileListOfSpectra );
DataSource fileListOfSpectraDataSource = 
env.fromParallelCollection( fileListOfSpectraSplitable, String.class );


and then - as before -

 DataSet peakLists = fileListOfSpectraDataSource
.flatMap(new ReadDataFromFile())
...

(Find the source for the class "SplittableList" below). Now FLINK 
distributes the tasks to all available FLINK nodes.


Thanks for the help!

Cheers
Tim



On 24.02.2016 16:30, Till Rohrmann wrote:


If I’m not mistaken, then this shouldn’t solve the scheduling 
peculiarity of Flink. Flink will still deploy the tasks of the flat 
map operation to the machine where the source task is running. Only 
after this machine has no more slots left, other machines will be used 
as well.


I think that you don’t need an explicit |rebalance()| method here. 
Flink will automatically insert the |PartitionMethod.REBALANCE| strategy.


Cheers,
Till

​


|import org.apache.flink.util.SplittableIterator; import 
java.util.Iterator; import java.util.List; public class 
SplittableList extends SplittableIterator { private List list; 
private int cursor; public SplittableList(List list) { this.cursor = 
0; this.list = list; } @Override public Iterator[] split(int 
numPartitions) { if (numPartitions < 1) { throw new 
IllegalArgumentException("The number of partitions must be at least 
1."); } Iterator[] iters = new Iterator[numPartitions]; if 
(numPartitions == 1) { iters[0] = new SplittableList(list); return 
iters; } int partSize = (int) Math.floor((double) list.size() / 
numPartitions); for (int i = 0; i < (numPartitions - 1); i++) { List 
subFileList = list.subList(i * partSize, (i + 1) * partSize); iters[i] = 
new SplittableList(subFileList); } List subFileList = 
list.subList((numPartitions - 1) * partSize, list.size()); 
iters[numPartitions - 1] = new SplittableList(subFileList); return 
iters; } @Override public int getMaximumNumberOfSplits() { return 
list.size(); } public boolean hasNext() { return (cursor < list.size()); 
} public T next() { T item = list.get(cursor); cursor++; return item; } 
public void remove() { throw new IllegalArgumentException("Remove not 
implemented yet."); } }|





Re: Best way to process data in many files? (FLINK-BATCH)

2016-02-24 Thread Till Rohrmann
If I’m not mistaken, then this shouldn’t solve the scheduling peculiarity
of Flink. Flink will still deploy the tasks of the flat map operation to
the machine where the source task is running. Only after this machine has
no more slots left, other machines will be used as well.

I think that you don’t need an explicit rebalance() method here. Flink will
automatically insert the PartitionMethod.REBALANCE strategy.

Cheers,
Till
​

On Wed, Feb 24, 2016 at 4:01 PM, Gábor Gévay  wrote:

> Hello,
>
> > // For each "filename" in list do...
> > DataSet featureList = fileList
> > .flatMap(new ReadDataSetFromFile()) // flatMap because
> there
> > might multiple DataSets in a file
>
> What happens if you just insert .rebalance() before the flatMap?
>
> > This kind of DataSource will only be executed
> > with a degree of parallelism of 1. The source will send it’s collection
> > elements in a round robin fashion to the downstream operators which are
> > executed with a higher parallelism. So when Flink schedules the
> downstream
> > operators, it will try to place them close to their inputs. Since all
> flat
> > map operators have the single data source task as an input, they will be
> > deployed on the same machine if possible.
>
> Sorry, I'm a little confused here. Do you mean that the flatMap will
> have a high parallelism, but all instances on a single machine?
> Because I tried to reproduce the situation where I have a non-parallel
> data source and then a flatMap, and the plan shows that the flatMap
> actually has parallelism 1, which would be an alternative explanation
> to the original problem that it gets executed on a single machine.
> Then, if I insert .rebalance() after the source, then a "Partition"
> operation appears between the source and the flatMap, and the flatMap
> has a high parallelism. I think this should also solve the problem,
> without having to write a parallel data source.
>
> Best,
> Gábor
>


Re: How to use all available task managers

2016-02-24 Thread Saiph Kappa
Thanks! It worked now :-)

On Wed, Feb 24, 2016 at 2:48 PM, Ufuk Celebi  wrote:

> You can use the environment to set it the job parallelism to 6 e.g.
> env.setParallelism(6).
>
> Setting this will override the default behaviour. Maybe that's why the
> default parallelism is not working... you might have it set to 1
> already?
>
> On Wed, Feb 24, 2016 at 3:41 PM, Saiph Kappa 
> wrote:
> > I set "parallelism.default: 6" on flink-conf.yaml of all 6 machines, and
> > still, my job only uses 1 task manager. Why?
> >
> >
> > On Wed, Feb 24, 2016 at 8:31 AM, Till Rohrmann 
> wrote:
> >>
> >> Hi Saiph,
> >>
> >> I think the configuration value should be parallelism.default: 6. That
> >> will execute jobs which have not parallelism defined with a DOP of 6.
> >>
> >> Cheers,
> >> Till
> >>
> >>
> >> On Wed, Feb 24, 2016 at 1:43 AM, Saiph Kappa 
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I  am running a flink stream application on a cluster with 6
> slaves/task
> >>> managers. I have set in flink-conf.yaml of every machine
> >>> "parallelization.degree.default: 6". However, when I run my
> application it
> >>> just uses one task slot and not all of them. Am I missing something?
> >>>
> >>> Thanks.
> >>
> >>
> >
>


Re: Best way to process data in many files? (FLINK-BATCH)

2016-02-24 Thread Gábor Gévay
Hello,

> // For each "filename" in list do...
> DataSet featureList = fileList
> .flatMap(new ReadDataSetFromFile()) // flatMap because there
> might multiple DataSets in a file

What happens if you just insert .rebalance() before the flatMap?

> This kind of DataSource will only be executed
> with a degree of parallelism of 1. The source will send it’s collection
> elements in a round robin fashion to the downstream operators which are
> executed with a higher parallelism. So when Flink schedules the downstream
> operators, it will try to place them close to their inputs. Since all flat
> map operators have the single data source task as an input, they will be
> deployed on the same machine if possible.

Sorry, I'm a little confused here. Do you mean that the flatMap will
have a high parallelism, but all instances on a single machine?
Because I tried to reproduce the situation where I have a non-parallel
data source and then a flatMap, and the plan shows that the flatMap
actually has parallelism 1, which would be an alternative explanation
to the original problem that it gets executed on a single machine.
Then, if I insert .rebalance() after the source, then a "Partition"
operation appears between the source and the flatMap, and the flatMap
has a high parallelism. I think this should also solve the problem,
without having to write a parallel data source.

Best,
Gábor


Re: How to use all available task managers

2016-02-24 Thread Ufuk Celebi
You can use the environment to set it the job parallelism to 6 e.g.
env.setParallelism(6).

Setting this will override the default behaviour. Maybe that's why the
default parallelism is not working... you might have it set to 1
already?

On Wed, Feb 24, 2016 at 3:41 PM, Saiph Kappa  wrote:
> I set "parallelism.default: 6" on flink-conf.yaml of all 6 machines, and
> still, my job only uses 1 task manager. Why?
>
>
> On Wed, Feb 24, 2016 at 8:31 AM, Till Rohrmann  wrote:
>>
>> Hi Saiph,
>>
>> I think the configuration value should be parallelism.default: 6. That
>> will execute jobs which have not parallelism defined with a DOP of 6.
>>
>> Cheers,
>> Till
>>
>>
>> On Wed, Feb 24, 2016 at 1:43 AM, Saiph Kappa 
>> wrote:
>>>
>>> Hi,
>>>
>>> I  am running a flink stream application on a cluster with 6 slaves/task
>>> managers. I have set in flink-conf.yaml of every machine
>>> "parallelization.degree.default: 6". However, when I run my application it
>>> just uses one task slot and not all of them. Am I missing something?
>>>
>>> Thanks.
>>
>>
>


Re: How to use all available task managers

2016-02-24 Thread Saiph Kappa
I set "parallelism.default: 6" on flink-conf.yaml of all 6 machines, and
still, my job only uses 1 task manager. Why?

On Wed, Feb 24, 2016 at 8:31 AM, Till Rohrmann  wrote:

> Hi Saiph,
>
> I think the configuration value should be parallelism.default: 6. That
> will execute jobs which have not parallelism defined with a DOP of 6.
>
> Cheers,
> Till
> ​
>
> On Wed, Feb 24, 2016 at 1:43 AM, Saiph Kappa 
> wrote:
>
>> Hi,
>>
>> I  am running a flink stream application on a cluster with 6 slaves/task
>> managers. I have set in flink-conf.yaml of every machine
>> "parallelization.degree.default: 6". However, when I run my application it
>> just uses one task slot and not all of them. Am I missing something?
>>
>> Thanks.
>>
>
>


Re: downloading dependency in apache flink

2016-02-24 Thread Till Rohrmann
What is the error message you receive?

On Wed, Feb 24, 2016 at 1:49 PM, Pankaj Kumar  wrote:

> Hi Till ,
>
> I was able to make fat jar, but i am not able to execute this jar through
> flink command line.
>
> On Wed, Feb 24, 2016 at 4:31 PM, Till Rohrmann 
> wrote:
>
>> Hi Pankaj,
>>
>> are you creating a fat jar when you create your use code jar? This can be
>> done using maven's shade plugin or the assembly plugin. We provide a maven
>> archetype to set up a pom file which will make sure that a fat jar is built
>> [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html
>>
>> Cheers,
>> Till
>>
>> On Wed, Feb 24, 2016 at 11:31 AM, Pankaj Kumar 
>> wrote:
>>
>>> i am trying to write a job, using maven project.
>>>
>>> Job is working fine in my editor , but when i am running that job
>>> through flink command line its giving ClassNotFoundException exception
>>> . Its not to find dependency.
>>>
>>> If i will create a jar , will flink download all its dependency before
>>> executing that jar. if not that how can we run one-jar.
>>>
>>>
>>> Thanks
>>>
>>
>>
>


Re: downloading dependency in apache flink

2016-02-24 Thread Pankaj Kumar
Hi Till ,

I was able to make fat jar, but i am not able to execute this jar through
flink command line.

On Wed, Feb 24, 2016 at 4:31 PM, Till Rohrmann  wrote:

> Hi Pankaj,
>
> are you creating a fat jar when you create your use code jar? This can be
> done using maven's shade plugin or the assembly plugin. We provide a maven
> archetype to set up a pom file which will make sure that a fat jar is built
> [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html
>
> Cheers,
> Till
>
> On Wed, Feb 24, 2016 at 11:31 AM, Pankaj Kumar 
> wrote:
>
>> i am trying to write a job, using maven project.
>>
>> Job is working fine in my editor , but when i am running that job through
>> flink command line its giving ClassNotFoundException exception . Its not
>> to find dependency.
>>
>> If i will create a jar , will flink download all its dependency before
>> executing that jar. if not that how can we run one-jar.
>>
>>
>> Thanks
>>
>
>


Re: Best way to process data in many files? (FLINK-BATCH)

2016-02-24 Thread Till Rohrmann
Hi Tim,

unfortunately, this is not documented explicitly as far as I know. For the
InputFormats there is a marker interface called NonParallelInput. The input
formats which implement this interface will be executed with a parallelism
of 1. At the moment this holds true for the CollectionInputFormat,
IteratorInputFormat and the JDBCInputFormat.

I hope this helps.

Cheers,
Till
​

On Tue, Feb 23, 2016 at 3:44 PM, Tim Conrad 
wrote:

> Hi Till (and others).
>
> Thank you very much for your helpful answer.
>
> On 23.02.2016 14:20, Till Rohrmann wrote:
>
> [...] In contrast, if you had a parallel data source which would consist
> of multiple source task, then these tasks would be independent and spread
> out across your cluster [...]
>
>
> Can you please send me a link to an example or to the respective Flink API
> doc, where I can see which is a parallel data source and how to create it
> with multiple source tasks?
>
> A simple Google search did not provide me with an answer (maybe I used the
> wrong key words, though...).
>
>
> Cheers
> Tim
>
>
>
>
> On 23.02.2016 14:20, Till Rohrmann wrote:
>
> Hi Tim,
>
> depending on how you create the DataSource fileList, Flink will
> schedule the downstream operators differently. If you used the
> ExecutionEnvironment.fromCollection method, then it will create a
> DataSource with a CollectionInputFormat. This kind of DataSource will
> only be executed with a degree of parallelism of 1. The source will send
> it’s collection elements in a round robin fashion to the downstream
> operators which are executed with a higher parallelism. So when Flink
> schedules the downstream operators, it will try to place them close to
> their inputs. Since all flat map operators have the single data source task
> as an input, they will be deployed on the same machine if possible.
>
> In contrast, if you had a parallel data source which would consist of
> multiple source task, then these tasks would be independent and spread out
> across your cluster. In this case, every flat map task would have a single
> distinct source task as input. When the flat map tasks are deployed they
> would be deployed on the machine where their corresponding source is
> running. Since the source tasks are spread out across the cluster, the flat
> map tasks would be spread out as well.
>
> What you could do to mitigate your problem is to start the cluster with as
> many slots as your maximum degree of parallelism is. That way, you’ll
> utilize all cluster resources.
>
> I hope this clarifies a bit why you observe that tasks tend to cluster on
> a single machine.
>
> Cheers,
> Till
> ​
>
>
>


Re: Underlying TaskManager's Actor System

2016-02-24 Thread Till Rohrmann
Hi Andrea,

no there isn’t. But you can always start your own ActorSystem in a stateful
operator.

Cheers,
Till
​

On Wed, Feb 24, 2016 at 11:57 AM, Andrea Sella 
wrote:

> Hi,
> There is a way to access to the underlying TaskManager's Actor System?
>
> Thank you in advance,
> Andrea
>


Re: downloading dependency in apache flink

2016-02-24 Thread Till Rohrmann
Hi Pankaj,

are you creating a fat jar when you create your use code jar? This can be
done using maven's shade plugin or the assembly plugin. We provide a maven
archetype to set up a pom file which will make sure that a fat jar is built
[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/quickstart/java_api_quickstart.html

Cheers,
Till

On Wed, Feb 24, 2016 at 11:31 AM, Pankaj Kumar  wrote:

> i am trying to write a job, using maven project.
>
> Job is working fine in my editor , but when i am running that job through
> flink command line its giving ClassNotFoundException exception . Its not
> to find dependency.
>
> If i will create a jar , will flink download all its dependency before
> executing that jar. if not that how can we run one-jar.
>
>
> Thanks
>


Underlying TaskManager's Actor System

2016-02-24 Thread Andrea Sella
Hi,
There is a way to access to the underlying TaskManager's Actor System?

Thank you in advance,
Andrea


Re: Error when executing job

2016-02-24 Thread Till Rohrmann
I assume that you included the flink-connector-twitter dependency in your
job jar, right? Alternatively, you might also put the jar in the lib folder
on each of your machines.

Cheers,
Till
​

On Wed, Feb 24, 2016 at 10:38 AM, ram kumar  wrote:

> Hi,
>
>
> getting below error when executing twitter flink job,
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at org.apache.flink.client.program.Client.runBlocking(Client.java:370)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:96)
> at com.flink.TwitterFlink.main(TwitterFlink.java:45)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Call to registerInputOutput() of invokable
> failed
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot load user class:
> org.apache.flink.streaming.connectors.twitter.TwitterFilterSource
> ClassLoader info: URL ClassLoader:
> file:
> '/tmp/blobStore-f32e1c65-7412-4f59-9767-9fd6a1dd41ad/cache/blob_29086aa8f430f5335f527a0fba05b9e6837e0e1b'
> (valid JAR)
> Class not resolvable through given classloader.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:187)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:174)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
> ... 1 more
>
>
> but i have TwitterFilterSource in pom.xml
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *pom.xmlcom.twitter
> hbc-core
> 2.2.0   
> org.apache.flink
> flink-streaming-java
> 0.10.1
> org.apache.flink
> flink-connector-twitter
> 0.10.1*
> any inputs?
>
> Thanks
>
>


downloading dependency in apache flink

2016-02-24 Thread Pankaj Kumar
i am trying to write a job, using maven project.

Job is working fine in my editor , but when i am running that job through
flink command line its giving ClassNotFoundException exception . Its not to
find dependency.

If i will create a jar , will flink download all its dependency before
executing that jar. if not that how can we run one-jar.


Thanks


Error when executing job

2016-02-24 Thread ram kumar
Hi,


getting below error when executing twitter flink job,

org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:370)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:96)
at com.flink.TwitterFlink.main(TwitterFlink.java:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable
failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class:
org.apache.flink.streaming.connectors.twitter.TwitterFilterSource
ClassLoader info: URL ClassLoader:
file:
'/tmp/blobStore-f32e1c65-7412-4f59-9767-9fd6a1dd41ad/cache/blob_29086aa8f430f5335f527a0fba05b9e6837e0e1b'
(valid JAR)
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:187)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:174)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
... 1 more


but i have TwitterFilterSource in pom.xml





















*pom.xmlcom.twitter
hbc-core
2.2.0   
org.apache.flink
flink-streaming-java
0.10.1
org.apache.flink
flink-connector-twitter
0.10.1*
any inputs?

Thanks


Re: Dataset filter improvement

2016-02-24 Thread Till Rohrmann
Hi Flavio,

it works the following way: Your data type will serialized by the
PojoSerializer iff it is a POJO. Iff it is a generic type which cannot be
serialized by any of the other serializers, then Kryo is used.

If it is a POJO type and you’re having DataStream which can also contain
subtypes of this type, then registering these subtypes will avoid writing
their complete class name. Instead a shorter ID will be written.

If it is a generic type and your data type is not automatically registered
(e.g. because your data stream also contains sub types), then registering
these types will have the same effect. Instead of writing the complete
class name every time your serialize your data, you will only write a much
shorter ID.

Cheers,
Till
​

On Wed, Feb 24, 2016 at 9:39 AM, Flavio Pompermaier 
wrote:

> Thanks Max and Till for the answers. However I still didn't understand
> fully the difference...Here are my doubts:
>
>- If I don't register any of my POJO classes, they will be serialized
>with Kryo (black box for Flink)
>- If I register all of my POJO using env.registerType they will be
>serialized as POJO (which is slower than Tuple serialization but much
>faster than Kryo)
>- What if I call env.registerTypeWithKryoSerializer()? Why should I
>specify a serializer for Kryo?
>
> Best,
> Flavio
>
>
> On Tue, Feb 23, 2016 at 4:08 PM, Till Rohrmann 
> wrote:
>
>> Registering a data type is only relevant for the Kryo serializer or if
>> you want to serialize a subclass of a POJO. Registering has the advantage
>> that you assign an id to the class which is written instead of the full
>> class name. The latter is usually much longer than the id.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 23, 2016 at 3:17 PM, Maximilian Michels 
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> I think the point is that Flink can use its serialization tools if you
>>> register the class in advance. If you don't do that, it will use Kryo
>>> as a fall-back which is slightly less efficient.
>>>
>>> Equals and hash code have to be implemented correctly if you compare
>>> Pojos. For standard types like String or Integer, this is done
>>> automatically. For Pojos, Flink doesn't know whether it is implemented
>>> correctly or not. Every object in Java has a default equals and
>>> hashCode implementation.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier
>>>  wrote:
>>> > Hi Max,
>>> > why do I need to register them? My job runs without problem also
>>> without
>>> > that.
>>> > The only problem with my POJOs was that I had to implement equals and
>>> hash
>>> > correctly, Flink didn't enforce me to do it but then results were
>>> wrong :(
>>> >
>>> >
>>> >
>>> > On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels 
>>> wrote:
>>> >>
>>> >> Hi Flavio,
>>> >>
>>> >> Stephan was referring to
>>> >>
>>> >> env.registerType(ExtendedClass1.class);
>>> >> env.registerType(ExtendedClass2.class);
>>> >>
>>> >> Cheers,
>>> >> Max
>>> >>
>>> >> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
>>> >>  wrote:
>>> >> > What do you mean exactly..? Probably I'm missing something
>>> >> > here..remember
>>> >> > that I can specify the right subClass only after the last flatMap,
>>> after
>>> >> > the
>>> >> > first map neither me nor Flink can know the exact subclass of
>>> BaseClass
>>> >> >
>>> >> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen 
>>> wrote:
>>> >> >>
>>> >> >> Class hierarchies should definitely work, even if the base class
>>> has no
>>> >> >> fields.
>>> >> >>
>>> >> >> They work more efficiently if you register the subclasses at the
>>> >> >> execution
>>> >> >> environment (Flink cannot infer them from the function signatures
>>> >> >> because
>>> >> >> the function signatures only contain the abstract base class).
>>> >> >>
>>> >> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
>>> >> >>  wrote:
>>> >> >>>
>>> >> >>> Because The classes are not related to each other. Do you think
>>> it's a
>>> >> >>> good idea to have something like this?
>>> >> >>>
>>> >> >>> abstract class BaseClass(){
>>> >> >>>String someField;
>>> >> >>> }
>>> >> >>>
>>> >> >>> class ExtendedClass1 extends BaseClass (){
>>> >> >>>String someOtherField11;
>>> >> >>>String someOtherField12;
>>> >> >>>String someOtherField13;
>>> >> >>>  ...
>>> >> >>> }
>>> >> >>>
>>> >> >>> class ExtendedClass2 extends BaseClass (){
>>> >> >>>Integer someOtherField21;
>>> >> >>>Double someOtherField22;
>>> >> >>>Integer someOtherField23;
>>> >> >>>  ...
>>> >> >>> }
>>> >> >>>
>>> >> >>> and then declare my map as Map. and then apply a
>>> >> >>> flatMap that can be used to generated the specific datasets?
>>> >> >>> Doesn't this cause problem to Flink? Classes can be vrry
>>> different to
>>> >> >>> each 

Re: Dataset filter improvement

2016-02-24 Thread Flavio Pompermaier
Thanks Max and Till for the answers. However I still didn't understand
fully the difference...Here are my doubts:

   - If I don't register any of my POJO classes, they will be serialized
   with Kryo (black box for Flink)
   - If I register all of my POJO using env.registerType they will be
   serialized as POJO (which is slower than Tuple serialization but much
   faster than Kryo)
   - What if I call env.registerTypeWithKryoSerializer()? Why should I
   specify a serializer for Kryo?

Best,
Flavio

On Tue, Feb 23, 2016 at 4:08 PM, Till Rohrmann  wrote:

> Registering a data type is only relevant for the Kryo serializer or if you
> want to serialize a subclass of a POJO. Registering has the advantage that
> you assign an id to the class which is written instead of the full class
> name. The latter is usually much longer than the id.
>
> Cheers,
> Till
>
> On Tue, Feb 23, 2016 at 3:17 PM, Maximilian Michels 
> wrote:
>
>> Hi Flavio,
>>
>> I think the point is that Flink can use its serialization tools if you
>> register the class in advance. If you don't do that, it will use Kryo
>> as a fall-back which is slightly less efficient.
>>
>> Equals and hash code have to be implemented correctly if you compare
>> Pojos. For standard types like String or Integer, this is done
>> automatically. For Pojos, Flink doesn't know whether it is implemented
>> correctly or not. Every object in Java has a default equals and
>> hashCode implementation.
>>
>> Cheers,
>> Max
>>
>> On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier
>>  wrote:
>> > Hi Max,
>> > why do I need to register them? My job runs without problem also without
>> > that.
>> > The only problem with my POJOs was that I had to implement equals and
>> hash
>> > correctly, Flink didn't enforce me to do it but then results were wrong
>> :(
>> >
>> >
>> >
>> > On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels 
>> wrote:
>> >>
>> >> Hi Flavio,
>> >>
>> >> Stephan was referring to
>> >>
>> >> env.registerType(ExtendedClass1.class);
>> >> env.registerType(ExtendedClass2.class);
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
>> >>  wrote:
>> >> > What do you mean exactly..? Probably I'm missing something
>> >> > here..remember
>> >> > that I can specify the right subClass only after the last flatMap,
>> after
>> >> > the
>> >> > first map neither me nor Flink can know the exact subclass of
>> BaseClass
>> >> >
>> >> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen 
>> wrote:
>> >> >>
>> >> >> Class hierarchies should definitely work, even if the base class
>> has no
>> >> >> fields.
>> >> >>
>> >> >> They work more efficiently if you register the subclasses at the
>> >> >> execution
>> >> >> environment (Flink cannot infer them from the function signatures
>> >> >> because
>> >> >> the function signatures only contain the abstract base class).
>> >> >>
>> >> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
>> >> >>  wrote:
>> >> >>>
>> >> >>> Because The classes are not related to each other. Do you think
>> it's a
>> >> >>> good idea to have something like this?
>> >> >>>
>> >> >>> abstract class BaseClass(){
>> >> >>>String someField;
>> >> >>> }
>> >> >>>
>> >> >>> class ExtendedClass1 extends BaseClass (){
>> >> >>>String someOtherField11;
>> >> >>>String someOtherField12;
>> >> >>>String someOtherField13;
>> >> >>>  ...
>> >> >>> }
>> >> >>>
>> >> >>> class ExtendedClass2 extends BaseClass (){
>> >> >>>Integer someOtherField21;
>> >> >>>Double someOtherField22;
>> >> >>>Integer someOtherField23;
>> >> >>>  ...
>> >> >>> }
>> >> >>>
>> >> >>> and then declare my map as Map. and then apply a
>> >> >>> flatMap that can be used to generated the specific datasets?
>> >> >>> Doesn't this cause problem to Flink? Classes can be vrry different
>> to
>> >> >>> each other..maybe this can cause problems with the plan
>> >> >>> generation..isn't
>> >> >>> it?
>> >> >>>
>> >> >>> Thanks Fabian and Stephan for the support!
>> >> >>>
>> >> >>>
>> >> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen 
>> >> >>> wrote:
>> >> 
>> >>  Why not use an abstract base class and N subclasses?
>> >> 
>> >>  On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <
>> fhue...@gmail.com>
>> >>  wrote:
>> >> >
>> >> > Unfortunately, there is no Either<1,...,n>.
>> >> > You could implement something like a Tuple3> >> > Option, Option>. However, Flink does not provide an
>> >> > Option
>> >> > type (comes with Java8). You would need to implement it yourself
>> >> > incl.
>> >> > TypeInfo and Serializer. You can get some inspiration from the
>> >> > Either type
>> >> > info /serializer, if you want to go this way.
>> >> >
>> >> > Using a byte array would also work 

Re: How to use all available task managers

2016-02-24 Thread Till Rohrmann
Hi Saiph,

I think the configuration value should be parallelism.default: 6. That will
execute jobs which have not parallelism defined with a DOP of 6.

Cheers,
Till
​

On Wed, Feb 24, 2016 at 1:43 AM, Saiph Kappa  wrote:

> Hi,
>
> I  am running a flink stream application on a cluster with 6 slaves/task
> managers. I have set in flink-conf.yaml of every machine
> "parallelization.degree.default: 6". However, when I run my application it
> just uses one task slot and not all of them. Am I missing something?
>
> Thanks.
>