[jira] [Created] (FLINK-8649) Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo

2018-02-13 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-8649:
--

 Summary: Scala StreamExecutionEnvironment.createInput should pass 
on the TypeInfo
 Key: FLINK-8649
 URL: https://issues.apache.org/jira/browse/FLINK-8649
 Project: Flink
  Issue Type: Bug
  Components: Scala API
Affects Versions: 1.4.0
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 1.5.0


This is {{StreamExecutionEnvironment.createInput}} in the Scala API:
{code}
def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): 
DataStream[T] =
  asScalaStream(javaEnv.createInput(inputFormat))
{code}
It should pass on the implicitly got {{TypeInformation}} to Java like this:
{code}
def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): 
DataStream[T] =
  asScalaStream(javaEnv.createInput(inputFormat, 
implicitly[TypeInformation[T]]))
{code}
The current situation creates a problem, for example, when we have generics in 
the type like in the following code, where the Java API can't deduce the 
{{TypeInformation}} on its own:
{code}
 StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple2[Integer, 
Integer]](new ParallelIteratorInputFormat[Tuple2[Integer, Integer]](null))
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface

2017-11-26 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266003#comment-16266003
 ] 

Gabor Gevay commented on FLINK-8116:


Hi,

The mentions of the Checkpointed interface need to be upgraded. Here is a 
relevant section in the docs, which explains the change of interfaces between 
1.1 ans 1.2:
https://ci.apache.org/projects/flink/flink-docs-master/dev/migration.html#rescaling-and-new-state-abstractions

I suggest using Ctrl+Shift+F in IntelliJ to find all stale mentions of the 
Checkpointed interface.

Btw. you can also check out some old commit in git, if you would like to look 
at how things were in 1.1.

I also suggest reading this contributor guide:
https://flink.apache.org/contribute-code.html

> Stale comments referring to Checkpointed interface
> --
>
> Key: FLINK-8116
> URL: https://issues.apache.org/jira/browse/FLINK-8116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Documentation
>Reporter: Gabor Gevay
>Priority: Trivial
>  Labels: starter
> Fix For: 1.5.0
>
>
> Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by 
> the {{CheckpointedFunction}} interface.
> However, in {{SourceFunction}} there are two comments still referring to the 
> old {{Checkpointed}} interface. (The code examples there also need to be 
> modified.)
> Note that the problem also occurs in {{StreamExecutionEnvironment}}, and 
> possibly other places as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner

2017-11-24 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay resolved FLINK-8117.

Resolution: Implemented

> Eliminate modulo operation from RoundRobinChannelSelector and 
> RebalancePartitioner
> --
>
> Key: FLINK-8117
> URL: https://issues.apache.org/jira/browse/FLINK-8117
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
> Fix For: 1.5.0
>
>
> {{RoundRobinChannelSelector}}, {{RebalancePartitioner}}, and 
> {{RescalePartitioner}} use a modulo operation to wrap around when the current 
> channel counter reaches the number of channels. Using an {{if}} would have 
> better performance.
> A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but 
> the {{if}} will be only 1-2 cycles on average, since the branch predictor can 
> most of the time predict the condition to be false.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8130) Javadocs link for snapshot release is not correct

2017-11-22 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-8130:
---
Fix Version/s: (was: 1.4.0)
   1.5.0

> Javadocs link for snapshot release is not correct
> -
>
> Key: FLINK-8130
> URL: https://issues.apache.org/jira/browse/FLINK-8130
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Gabor Gevay
>Priority: Blocker
> Fix For: 1.5.0
>
>
> See last comments on FLINK-7702.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner

2017-11-21 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-8117:
---
Description: 
{{RoundRobinChannelSelector}}, {{RebalancePartitioner}}, and 
{{RescalePartitioner}} use a modulo operation to wrap around when the current 
channel counter reaches the number of channels. Using an {{if}} would have 
better performance.

A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but 
the {{if}} will be only 1-2 cycles on average, since the branch predictor can 
most of the time predict the condition to be false.

\[1\] http://www.agner.org/optimize/instruction_tables.pdf

  was:
Both {{RoundRobinChannelSelector}} and {{RebalancePartitioner}} use a modulo 
operation to wrap around when the current channel counter reaches the number of 
channels. Using an {{if}} would have better performance.

A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but 
the {{if}} will be only 1-2 cycles on average, since the branch predictor can 
most of the time predict the condition to be false.

\[1\] http://www.agner.org/optimize/instruction_tables.pdf


> Eliminate modulo operation from RoundRobinChannelSelector and 
> RebalancePartitioner
> --
>
> Key: FLINK-8117
> URL: https://issues.apache.org/jira/browse/FLINK-8117
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
> Fix For: 1.5.0
>
>
> {{RoundRobinChannelSelector}}, {{RebalancePartitioner}}, and 
> {{RescalePartitioner}} use a modulo operation to wrap around when the current 
> channel counter reaches the number of channels. Using an {{if}} would have 
> better performance.
> A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but 
> the {{if}} will be only 1-2 cycles on average, since the branch predictor can 
> most of the time predict the condition to be false.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner

2017-11-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-8117:
--

 Summary: Eliminate modulo operation from RoundRobinChannelSelector 
and RebalancePartitioner
 Key: FLINK-8117
 URL: https://issues.apache.org/jira/browse/FLINK-8117
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime, Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
 Fix For: 1.5.0


Both {{RoundRobinChannelSelector}} and {{RebalancePartitioner}} use a modulo 
operation to wrap around when the current channel counter reaches the number of 
channels. Using an {{if}} would have better performance.

A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but 
the {{if}} will be only 1-2 cycles on average, since the branch predictor can 
most of the time predict the condition to be false.

\[1\] http://www.agner.org/optimize/instruction_tables.pdf



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7702) Javadocs are not being built

2017-11-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-7702:
---
Fix Version/s: (was: 1.4.0)
   1.5.0

> Javadocs are not being built
> 
>
> Key: FLINK-7702
> URL: https://issues.apache.org/jira/browse/FLINK-7702
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Gabor Gevay
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The "Javadocs" link in the left side menu of this page doesn't work:
> https://ci.apache.org/projects/flink/flink-docs-master/
> Note that it works in 1.3:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8116) Stale comments referring to Checkpointed interface

2017-11-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-8116:
--

 Summary: Stale comments referring to Checkpointed interface
 Key: FLINK-8116
 URL: https://issues.apache.org/jira/browse/FLINK-8116
 Project: Flink
  Issue Type: Bug
  Components: DataStream API, Documentation
Reporter: Gabor Gevay
Priority: Trivial
 Fix For: 1.4.0, 1.5.0


Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by the 
{{CheckpointedFunction}} interface.

However, in {{SourceFunction}} there are two comments still referring to the 
old {{Checkpointed}} interface. (The code examples there also need to be 
modified.)

Note that the problem also occurs in {{StreamExecutionEnvironment}}, and 
possibly other places as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-7702) Javadocs are not being built

2017-11-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reopened FLINK-7702:

  Assignee: (was: Aljoscha Krettek)

I'm reopening this, since it seems that this is again not working: I have 
exactly the same issue as in the original Jira description.

> Javadocs are not being built
> 
>
> Key: FLINK-7702
> URL: https://issues.apache.org/jira/browse/FLINK-7702
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Gabor Gevay
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The "Javadocs" link in the left side menu of this page doesn't work:
> https://ci.apache.org/projects/flink/flink-docs-master/
> Note that it works in 1.3:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4575) DataSet aggregate methods should support POJOs

2017-11-13 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16249444#comment-16249444
 ] 

Gabor Gevay commented on FLINK-4575:


OK, makes sense. Feel free to close this jira, if you think we shouldn't do it.

> DataSet aggregate methods should support POJOs
> --
>
> Key: FLINK-4575
> URL: https://issues.apache.org/jira/browse/FLINK-4575
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: starter
>
> The aggregate methods of DataSets (aggregate, sum, min, max) currently only 
> support Tuples, with the fields specified by indices. With 
> https://issues.apache.org/jira/browse/FLINK-3702 resolved, adding support for 
> POJOs and field expressions would be easy: {{AggregateOperator}} would create 
> {{FieldAccessors}} instead of just storing field positions, and 
> {{AggregateOperator.AggregatingUdf}} would use these {{FieldAccessors}} 
> instead of the Tuple field access methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4575) DataSet aggregate methods should support POJOs

2017-11-12 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248874#comment-16248874
 ] 

Gabor Gevay commented on FLINK-4575:


[~vcycyv], I'm not sure how would {{getFlatFields}} help here. (How would you 
convert back to POJO at the end?)

But if you would like to work on this jira, then the approach outlined in the 
jira description should work. I think this is the cleanest solution, since 
{{FieldAccessor}} is exactly for situations like we have here, where we have to 
get and set a field, based on a field expression. However, you would have to 
resolve https://issues.apache.org/jira/browse/FLINK-4578 first. I think that 
could be resolved by the solution that I wrote in a comment there.

> DataSet aggregate methods should support POJOs
> --
>
> Key: FLINK-4575
> URL: https://issues.apache.org/jira/browse/FLINK-4575
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: starter
>
> The aggregate methods of DataSets (aggregate, sum, min, max) currently only 
> support Tuples, with the fields specified by indices. With 
> https://issues.apache.org/jira/browse/FLINK-3702 resolved, adding support for 
> POJOs and field expressions would be easy: {{AggregateOperator}} would create 
> {{FieldAccessors}} instead of just storing field positions, and 
> {{AggregateOperator.AggregatingUdf}} would use these {{FieldAccessors}} 
> instead of the Tuple field access methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7916) Remove NetworkStackThroughputITCase

2017-10-25 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218609#comment-16218609
 ] 

Gabor Gevay commented on FLINK-7916:


In {{flink-tests}} there is a package called {{org.apache.flink.test.manual}}, 
which already has some benchmarks, which can be run manually. Maybe this one 
could be moved there too.

> Remove NetworkStackThroughputITCase
> ---
>
> Key: FLINK-7916
> URL: https://issues.apache.org/jira/browse/FLINK-7916
> Project: Flink
>  Issue Type: Task
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
> Fix For: 1.5.0
>
>
> Flink's code base contains the {{NetworkStackThroughputITCase}} which is not 
> really a test. Moreover it is marked as {{Ignored}}. I propose to remove this 
> test because it is more of a benchmark. We could think about creating a 
> benchmark project where we move these kind of "tests".
> In general I think we should remove ignored tests if they won't be fixed 
> immediately. The danger is far too high that we forget about them and then we 
> only keep the maintenance burden of it. This is especially true for the above 
> mentioned test case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-1268) FileOutputFormat with overwrite does not clear local output directories

2017-10-10 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198727#comment-16198727
 ] 

Gabor Gevay commented on FLINK-1268:


This issue just happened to me. I ran my job locally with parallelism 8, and 
then later with 4, and then I was debugging for an hour to figure out what went 
wrong.

> FileOutputFormat with overwrite does not clear local output directories
> ---
>
> Key: FLINK-1268
> URL: https://issues.apache.org/jira/browse/FLINK-1268
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Till Rohrmann
>Priority: Minor
>
> I noticed that the FileOutputFormat does not clear the output directories if 
> it writes to local disk. This has the consequence that previous partitions 
> are still contained in the directory if one decreases the DOP between 
> subsequent runs. If one reads the data from this directory, then more 
> partitions will be read in than were actually written. This can lead to a 
> wrong user code behaviour which is hard to debug. I'm aware that in case of a 
> distributed execution the TaskManagers or the Tasks have to be responsible 
> for the cleanup and if multiple Tasks are running on a TaskManager, then the 
> cleanup has to be coordinated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7702) Javadocs link broken

2017-09-27 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-7702:
---
Fix Version/s: 1.4.0

> Javadocs link broken
> 
>
> Key: FLINK-7702
> URL: https://issues.apache.org/jira/browse/FLINK-7702
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Gabor Gevay
>Assignee: Timo Walther
>Priority: Minor
> Fix For: 1.4.0
>
>
> The "Javadocs" link in the left side menu of this page doesn't work:
> https://ci.apache.org/projects/flink/flink-docs-master/
> Note that it works in 1.3:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7702) Javadocs link broken

2017-09-27 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-7702:
--

 Summary: Javadocs link broken
 Key: FLINK-7702
 URL: https://issues.apache.org/jira/browse/FLINK-7702
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Gabor Gevay
Priority: Minor


The "Javadocs" link in the left side menu of this page doesn't work:
https://ci.apache.org/projects/flink/flink-docs-master/

Note that it works in 1.3:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7685) CompilerException: "Bug: Logic for branching plans (non-tree plans) has an error, and does not track the re-joining of branches correctly"

2017-09-25 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-7685:
--

 Summary: CompilerException: "Bug: Logic for branching plans 
(non-tree plans) has an error, and does not track the re-joining of branches 
correctly"
 Key: FLINK-7685
 URL: https://issues.apache.org/jira/browse/FLINK-7685
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Reporter: Gabor Gevay
Priority: Minor


A Flink program which reads an input DataSet, creates 64 new DataSets from it, 
and writes these to separate files throws the following exception:

{code:java}
Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
Logic for branching plans (non-tree plans) has an error, and does not track the 
re-joining of branches correctly.
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:491)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:921)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:86)
{code}

Here is some code that reproduces it:
https://github.com/ggevay/flink/tree/compiler-exception-new

Note that it works with less than 64 DataSets.

Also note that with more than 64 DataSets it throws {{CompilerException: Cannot 
currently handle nodes with more than 64 outputs}}, which is at least a clear 
error msg that helps the user to find a workaround.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7680) Add "Performance Tuning" section to docs

2017-09-24 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-7680:
--

 Summary: Add "Performance Tuning" section to docs
 Key: FLINK-7680
 URL: https://issues.apache.org/jira/browse/FLINK-7680
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Gabor Gevay
Priority: Minor
 Fix For: 1.4.0


We could have a separate section in the docs about performance tuning (maybe 
separately for batch and streaming jobs).

It could include for example:
- object reuse
- serializer issues
- semantic annotations
- optimizer hints
- sorter code generation (Flink-5734)

See [~fhueske]'s suggestion here:
https://github.com/apache/flink/pull/3511#discussion_r139917275



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7629) Scala stream aggregations should support nested field expressions

2017-09-17 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-7629:
---
Description: 
In the Scala API, {{KeyedStream.maxBy}} and similar methods currently only work 
with a field name, and not with nested field expressions, such as 
"fieldA.fieldX". (Their documentation says this should work.)

The reason for this is that the string overload of {{KeyedStream.aggregate}} 
uses {{fieldNames2Indices}} and then calls the integer overload. Instead, it 
should create a {{SumAggregator}} or {{ComparableAggregator}} directly, as the 
integer overload does (and as the Java API does). The ctors of 
{{SumAggregator}} or {{ComparableAggregator}} will call 
{{FieldAccessorFactory.getAccessor}}, which will correctly handle a nested 
field expression.

  was:
In the Scala API, {{KeyedStream.maxBy}} and similar methods currently only work 
with a field name, and not with nested field expressions, such as 
"fieldA.fieldX". (This contradicts their documentation.)

The reason for this is that the string overload of {{KeyedStream.aggregate}} 
uses {{fieldNames2Indices}} and then calls the integer overload. Instead, it 
should create a {{SumAggregator}} or {{ComparableAggregator}} directly, as the 
integer overload does (and as the Java API does). The ctors of 
{{SumAggregator}} or {{ComparableAggregator}} will call 
{{FieldAccessorFactory.getAccessor}}, which will correctly handle a nested 
field expression.


> Scala stream aggregations should support nested field expressions
> -
>
> Key: FLINK-7629
> URL: https://issues.apache.org/jira/browse/FLINK-7629
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
> Fix For: 1.4.0
>
>
> In the Scala API, {{KeyedStream.maxBy}} and similar methods currently only 
> work with a field name, and not with nested field expressions, such as 
> "fieldA.fieldX". (Their documentation says this should work.)
> The reason for this is that the string overload of {{KeyedStream.aggregate}} 
> uses {{fieldNames2Indices}} and then calls the integer overload. Instead, it 
> should create a {{SumAggregator}} or {{ComparableAggregator}} directly, as 
> the integer overload does (and as the Java API does). The ctors of 
> {{SumAggregator}} or {{ComparableAggregator}} will call 
> {{FieldAccessorFactory.getAccessor}}, which will correctly handle a nested 
> field expression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7629) Scala stream aggregations should support nested field expressions

2017-09-15 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-7629:
--

 Summary: Scala stream aggregations should support nested field 
expressions
 Key: FLINK-7629
 URL: https://issues.apache.org/jira/browse/FLINK-7629
 Project: Flink
  Issue Type: Bug
  Components: Scala API, Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
 Fix For: 1.4.0


In the Scala API, {{KeyedStream.maxBy}} and similar methods currently only work 
with a field name, and not with nested field expressions, such as 
"fieldA.fieldX". (This contradicts their documentation.)

The reason for this is that the string overload of {{KeyedStream.aggregate}} 
uses {{fieldNames2Indices}} and then calls the integer overload. Instead, it 
should create a {{SumAggregator}} or {{ComparableAggregator}} directly, as the 
integer overload does (and as the Java API does). The ctors of 
{{SumAggregator}} or {{ComparableAggregator}} will call 
{{FieldAccessorFactory.getAccessor}}, which will correctly handle a nested 
field expression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-4867) Investigate code generation for improving sort performance

2017-09-05 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay resolved FLINK-4867.

Resolution: Done

I'm closing this, as [~heytitle] did the performance investigation, and 
concluded that code generation is worthwhile to implement.

The Jira for actually implementing this is here:
https://issues.apache.org/jira/browse/FLINK-5734

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
> Attachments: Evaluation of Code Generation Approach for improving  
> Flinkā€™s NormalizedKeySorter.pdf
>
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-5734) Implement code generation for NormalizedKeySorter

2017-09-05 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reassigned FLINK-5734:
--

Assignee: Gabor Gevay

> Implement code generation for NormalizedKeySorter
> -
>
> Key: FLINK-5734
> URL: https://issues.apache.org/jira/browse/FLINK-5734
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Pattarawat Chormai
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
> Fix For: 1.4.0
>
>
> Link to design document : 
> https://docs.google.com/document/d/1anGQhBn9qI0yqe7twVvrDIiym4U4gxalJkZzM4Ar4QM/edit?usp=sharing
> See https://issues.apache.org/jira/browse/FLINK-4867 for performance tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5734) Implement code generation for NormalizedKeySorter

2017-09-05 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-5734:
---
Description: 
Link to design document : 
https://docs.google.com/document/d/1anGQhBn9qI0yqe7twVvrDIiym4U4gxalJkZzM4Ar4QM/edit?usp=sharing

See https://issues.apache.org/jira/browse/FLINK-4867 for performance tests.

  was:Link to design document : 
https://docs.google.com/document/d/1anGQhBn9qI0yqe7twVvrDIiym4U4gxalJkZzM4Ar4QM/edit?usp=sharing


> Implement code generation for NormalizedKeySorter
> -
>
> Key: FLINK-5734
> URL: https://issues.apache.org/jira/browse/FLINK-5734
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Pattarawat Chormai
>Priority: Minor
>  Labels: performance
> Fix For: 1.4.0
>
>
> Link to design document : 
> https://docs.google.com/document/d/1anGQhBn9qI0yqe7twVvrDIiym4U4gxalJkZzM4Ar4QM/edit?usp=sharing
> See https://issues.apache.org/jira/browse/FLINK-4867 for performance tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5734) Implement code generation for NormalizedKeySorter

2017-09-05 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-5734:
---
Fix Version/s: 1.4.0

> Implement code generation for NormalizedKeySorter
> -
>
> Key: FLINK-5734
> URL: https://issues.apache.org/jira/browse/FLINK-5734
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Pattarawat Chormai
>Priority: Minor
>  Labels: performance
> Fix For: 1.4.0
>
>
> Link to design document : 
> https://docs.google.com/document/d/1anGQhBn9qI0yqe7twVvrDIiym4U4gxalJkZzM4Ar4QM/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams

2017-04-04 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955166#comment-15955166
 ] 

Gabor Gevay commented on FLINK-2147:


Maybe we could over-partition the sketch into maxParallelism parts. (similarly, 
as we have more key-groups than actual partitions)

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Gabor Gevay
>  Labels: approximate, statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Reopened] (FLINK-2144) Incremental count, average, and variance for windows

2017-04-03 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reopened FLINK-2144:


Reopening to change it back to "Won't fix", instead of "Fixed"

> Incremental count, average, and variance for windows
> 
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-2144) Incremental count, average, and variance for windows

2017-04-03 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay resolved FLINK-2144.

Resolution: Won't Fix

> Incremental count, average, and variance for windows
> 
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5888) ForwardedFields annotation is not generating optimised execution plan in example KMeans job

2017-02-22 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878779#comment-15878779
 ] 

Gabor Gevay commented on FLINK-5888:


Just a wild guess: Is it possible that the shuffle actually ended up between 
the two maps, and it is somehow not visible on the graph? I'm thinking this 
because I don't see how would the annotation make the shuffle disappear (but 
it's possible that I'm misunderstanding something).

> ForwardedFields annotation is not generating optimised execution plan in 
> example KMeans job
> ---
>
> Key: FLINK-5888
> URL: https://issues.apache.org/jira/browse/FLINK-5888
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Examples, Java API
>Affects Versions: 1.1.3
>Reporter: Ziyad Muhammed Mohiyudheen
>
> Flink KMeans java example [1] shows the usage of ForwardedFields function 
> annotation. How ever, the example job was taking more time than expected on 
> medium sized data itself. By merely removing the function annotation from the 
> example code (with out any other change), a better execution plan and run 
> time was obtained. The execution plan shows that no combiner is used and the 
> two Map tasks are not chained when ForwardedFields is enabled. The experiment 
> is documented in [2]
> [1] 
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
> [2] https://drive.google.com/open?id=0B0IlZv0uHBuvVEZ5ZmNpN19jVVU



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-1730) Add a FlinkTools.persist style method to the Data Set.

2017-02-13 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863673#comment-15863673
 ] 

Gabor Gevay commented on FLINK-1730:


Hi [~kateri]

I'm also really looking forward to this feature.

Btw. here is some related discussion on the mailing list:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Questions-re-ExecutionGraph-amp-ResultPartitions-for-interactive-use-a-la-Spark-td4154.html#a4286


> Add a FlinkTools.persist style method to the Data Set.
> --
>
> Key: FLINK-1730
> URL: https://issues.apache.org/jira/browse/FLINK-1730
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Reporter: Stephan Ewen
>Assignee: Evgeny Kincharov
>
> I think this is an operation that will be needed more prominently. Defining a 
> point where one long logical program is broken into different executions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592359#comment-15592359
 ] 

Gabor Gevay commented on FLINK-4867:


Hm, this sounds quite interesting. Could you please share the code?

> As it is radix based and not comparison based, it would require some way to 
> expose partial sort keys instead of a compareTo method

Isn't the normalized key stuff that we already have solves this part?

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reassigned FLINK-4867:
--

Assignee: Gabor Gevay

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592367#comment-15592367
 ] 

Gabor Gevay commented on FLINK-4867:


That's nice, thanks!

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4869) Store record pointer after record keys

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4869:
---
Labels: performance  (was: )

> Store record pointer after record keys
> --
>
> Key: FLINK-4869
> URL: https://issues.apache.org/jira/browse/FLINK-4869
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
>
> {{NormalizedKeySorter}} serializes records into a {{RandomAccessInputView}} 
> separate from the memory segments used for the sort keys. By storing the 
> pointer after the sort keys the addition of the offset is moved from 
> {{NormalizedKeySorter.compare}} which is O(n log n)) to other methods which 
> are O\(n).
> Will run a performance comparison before submitting a PR to how significant a 
> performance improvement this would yield.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4860) Sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4860:
---
Component/s: Local Runtime

> Sort performance
> 
>
> Key: FLINK-4860
> URL: https://issues.apache.org/jira/browse/FLINK-4860
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Greg Hogan
>  Labels: performance
>
> A super-task for improvements to Flink's sort performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4860) Sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4860:
---
Labels: performance  (was: )

> Sort performance
> 
>
> Key: FLINK-4860
> URL: https://issues.apache.org/jira/browse/FLINK-4860
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Greg Hogan
>  Labels: performance
>
> A super-task for improvements to Flink's sort performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4868) Insertion sort could avoid the swaps

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4868:
---
Description: 
This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)

Note that the threshold for switching to the insertion sort could probably be 
increased after this.

  was:
This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)


> Insertion sort could avoid the swaps
> 
>
> Key: FLINK-4868
> URL: https://issues.apache.org/jira/browse/FLINK-4868
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This is about the fallback to insertion sort at the beginning of 
> {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
> we are at the bottom of the quick sort recursion tree.
> The inner loop does a series of swaps on adjacent elements for moving a block 
> of several elements one slot to the right and inserting the ith element at 
> the hole. However, it would be faster to first copy the ith element to a temp 
> location, and then move the block of elements to the right without swaps, and 
> then copy the original ith element to the hole.
> Moving the block of elements without swaps could be achieved by calling 
> {{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
> calls in {{MemorySegment.swap}} currently being done).
> (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
> memcpy, so I'm not sure if we can do the entire block of elements with maybe 
> even one {{UNSAFE.copyMemory}}.)
> Note that the threshold for switching to the insertion sort could probably be 
> increased after this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4868) Insertion sort could avoid the swaps

2016-10-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4868:
--

 Summary: Insertion sort could avoid the swaps
 Key: FLINK-4868
 URL: https://issues.apache.org/jira/browse/FLINK-4868
 Project: Flink
  Issue Type: Sub-task
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Minor


This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4867:
--

 Summary: Investigate code generation for improving sort performance
 Key: FLINK-4867
 URL: https://issues.apache.org/jira/browse/FLINK-4867
 Project: Flink
  Issue Type: Sub-task
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Minor


This issue is for investigating whether code generation could speed up sorting. 
We should make some performance measurements on hand-written code that is 
similar to what we could generate, to see whether investing more time into this 
is worth it. If we find that it is worth it, we can open a second Jira for the 
actual implementation of the code generation.

I think we could generate one class at places where we currently instantiate 
{{QuickSort}}. This generated class would include the functionality of 
{{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
{{MemorySegment.compare}}, and {{MemorySegment.swap}}.

Btw. I'm planning to give this as a student project at a TU Berlin course in 
the next few months.

Some concrete ideas about how could a generated sorter be faster than the 
current sorting code:
* {{MemorySegment.compare}} could be specialized for
** Length: for small records, the loop could be unrolled
** Endiannes (currently it is optimized for big endian; and in the little 
endian case (e.g. x86) it does a Long.reverseBytes for each long read)
* {{MemorySegment.swapBytes}}
** In case of small records, using three {{UNSAFE.copyMemory}} is probably not 
as efficient as a specialized swap, because
*** We could use total loop unrolling in generated code (because we know the 
exact record size)
*** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
*** We will only need 2/3 the memory bandwidth, because the temporary storage 
could be a register if we swap one byte (or one {{long}}) at a time
** several checks might be eliminated
* Better inlining behaviour could be achieved 
** Virtual function calls to the methods of {{InMemorySorter}} could be 
eliminated. (Note, that these are problematic to devirtualize by the JVM if 
there are different derived classes used in a single Flink job (see \[8,7\]).)
** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
excessive checks make it too large
** {{MemorySegment.compare}} is probably also not inlined currently, because 
those two while loops are too large

\[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
long, Object, long, long)
\[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
\[8\] 
http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
\[9\] 
http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-10-18 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587764#comment-15587764
 ] 

Gabor Gevay edited comment on FLINK-3999 at 10/19/16 5:47 AM:
--

Won't Fix; See comment here:
https://github.com/apache/flink/pull/2642#issuecomment-254628846


was (Author: ggevay):
See comment here:
https://github.com/apache/flink/pull/2642#issuecomment-254628846

> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-10-18 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay resolved FLINK-3999.

Resolution: Won't Fix

See comment here:
https://github.com/apache/flink/pull/2642#issuecomment-254628846

> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-11 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15482191#comment-15482191
 ] 

Gabor Gevay commented on FLINK-3322:


[~ram_krish] You are right, making the sorters resettable will be more 
involved. I would say that let's just concentrate for now on making the drivers 
resettable, and then later move on to {{UnilateralSortMerger}} and 
{{CombiningUnilateralSortMerger}}.

Even in the drivers there will be some complications for example with the 
joins, as the memory is allocated in {{HashJoinIteratorBase.getHashJoin}}, and 
not in the drivers itself. So {{JoinTaskIterator}} may also need to be made 
resettable.

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-08 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474575#comment-15474575
 ] 

Gabor Gevay commented on FLINK-3322:


I've just realized that what I wrote above doesn't cover the resetting of the 
local strategies. For that, we should maybe add a {{ResettableInputProvider}} 
interface  (that extends {{CloseableInputProvider}} (or even just add the reset 
method directly to {{CloseableInputProvider}})), and make the Sorters implement 
it, and call this new {{reset}} method in {{resetAllInputs}} (instead of 
{{close}} and then recreating them).

(But this can go to a separate pull request later.)

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-08 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474531#comment-15474531
 ] 

Gabor Gevay commented on FLINK-3322:


I had an offline chat with [~StephanEwen], and an alternative design came up, 
which we should also consider:

There is the {{ResettableDriver}} interface, which is implemented by those 
drivers that need to retain some state between iteration steps. The way 
{{AbstractIterativeTask}} uses this interface, is that it checks if the driver 
is an instance of this interface, and then doesn't destroy and recreate the 
driver between iteration steps, but instead just calls {{reset}} on it. We 
could make every driver implement this interface, and in their {{reset}} method 
they would just hold on to the memory that they already have.

When this is done, it would also allow some simplification by eliminating the 
special case handling that is distinguishing between resettable and 
non-resettable operators in lots of different places.

Since touching every driver probably can't be avoided anyway, this looks like 
the cleanest solution.

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4578) AggregateOperator incorrectly sets ForwardedField with nested composite types

2016-09-04 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463363#comment-15463363
 ] 

Gabor Gevay commented on FLINK-4578:


A hacky solution might be to create some dummy {{Keys}} object from the 
aggregate field, and call {{computeLogicalKeyPositions}} on it, so that we know 
the flat position of the key field.

> AggregateOperator incorrectly sets ForwardedField with nested composite types
> -
>
> Key: FLINK-4578
> URL: https://issues.apache.org/jira/browse/FLINK-4578
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Reporter: Gabor Gevay
>
> When an aggregation is called on a grouped DataSet, 
> {{AggregateOperator.translateToDataFlow}} tries to determine whether the 
> field that is being aggregated is the same field that the grouping is based 
> on. If this is not the case, then it adds the ForwardedField property for the 
> key field.
> However, the mechanism that makes this decision breaks when there are nested 
> composite types involved, because it gets the key positions with 
> {{getKeys().computeLogicalKeyPositions()}}, which returns the _flat_ 
> positions, whereas the position of the field to aggregate is counted only on 
> the outer type.
> Example code: https://github.com/ggevay/flink/tree/agg-bad-forwarded-fields
> Here, I have changed the WordCount example to have the type 
> {{Tuple3, String, Integer>}}, and do {{.groupBy(1).sum(2)}} 
> (which groups by the String field and sums the Integer field). If you set a 
> breakpoint into {{AggregateOperator.translateToDataFlow}}, you can see that 
> {{logicalKeyPositions}} contains 2, and {{fields}} also contains 2, which 
> causes {{keyFieldUsedInAgg}} to be erroneously set to true. The problem is 
> caused by the Tuple2 being counted as 2 fields in {{logicalKeyPositions}}, 
> but only 1 field in {{fields}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4575) DataSet aggregate methods should support POJOs

2016-09-04 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463360#comment-15463360
 ] 

Gabor Gevay commented on FLINK-4575:


This is a bit harder than I thought, because of adding the ForwardedFields 
property in case of the aggregation field not being the same as the key field. 
Note, that the old logic of determining this has a bug: 
https://issues.apache.org/jira/browse/FLINK-4578

> DataSet aggregate methods should support POJOs
> --
>
> Key: FLINK-4575
> URL: https://issues.apache.org/jira/browse/FLINK-4575
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: starter
>
> The aggregate methods of DataSets (aggregate, sum, min, max) currently only 
> support Tuples, with the fields specified by indices. With 
> https://issues.apache.org/jira/browse/FLINK-3702 resolved, adding support for 
> POJOs and field expressions would be easy: {{AggregateOperator}} would create 
> {{FieldAccessors}} instead of just storing field positions, and 
> {{AggregateOperator.AggregatingUdf}} would use these {{FieldAccessors}} 
> instead of the Tuple field access methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4575) DataSet aggregate methods should support POJOs

2016-09-04 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4575:
---
Assignee: (was: Gabor Gevay)

> DataSet aggregate methods should support POJOs
> --
>
> Key: FLINK-4575
> URL: https://issues.apache.org/jira/browse/FLINK-4575
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: starter
>
> The aggregate methods of DataSets (aggregate, sum, min, max) currently only 
> support Tuples, with the fields specified by indices. With 
> https://issues.apache.org/jira/browse/FLINK-3702 resolved, adding support for 
> POJOs and field expressions would be easy: {{AggregateOperator}} would create 
> {{FieldAccessors}} instead of just storing field positions, and 
> {{AggregateOperator.AggregatingUdf}} would use these {{FieldAccessors}} 
> instead of the Tuple field access methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4578) AggregateOperator incorrectly sets ForwardedField with nested composite types

2016-09-04 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4578:
--

 Summary: AggregateOperator incorrectly sets ForwardedField with 
nested composite types
 Key: FLINK-4578
 URL: https://issues.apache.org/jira/browse/FLINK-4578
 Project: Flink
  Issue Type: Bug
  Components: DataSet API
Reporter: Gabor Gevay


When an aggregation is called on a grouped DataSet, 
{{AggregateOperator.translateToDataFlow}} tries to determine whether the field 
that is being aggregated is the same field that the grouping is based on. If 
this is not the case, then it adds the ForwardedField property for the key 
field.

However, the mechanism that makes this decision breaks when there are nested 
composite types involved, because it gets the key positions with 
{{getKeys().computeLogicalKeyPositions()}}, which returns the _flat_ positions, 
whereas the position of the field to aggregate is counted only on the outer 
type.

Example code: https://github.com/ggevay/flink/tree/agg-bad-forwarded-fields
Here, I have changed the WordCount example to have the type 
{{Tuple3, String, Integer>}}, and do {{.groupBy(1).sum(2)}} 
(which groups by the String field and sums the Integer field). If you set a 
breakpoint into {{AggregateOperator.translateToDataFlow}}, you can see that 
{{logicalKeyPositions}} contains 2, and {{fields}} also contains 2, which 
causes {{keyFieldUsedInAgg}} to be erroneously set to true. The problem is 
caused by the Tuple2 being counted as 2 fields in {{logicalKeyPositions}}, but 
only 1 field in {{fields}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4575) DataSet aggregate methods should support POJOs

2016-09-04 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reassigned FLINK-4575:
--

Assignee: Gabor Gevay

> DataSet aggregate methods should support POJOs
> --
>
> Key: FLINK-4575
> URL: https://issues.apache.org/jira/browse/FLINK-4575
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: starter
>
> The aggregate methods of DataSets (aggregate, sum, min, max) currently only 
> support Tuples, with the fields specified by indices. With 
> https://issues.apache.org/jira/browse/FLINK-3702 resolved, adding support for 
> POJOs and field expressions would be easy: {{AggregateOperator}} would create 
> {{FieldAccessors}} instead of just storing field positions, and 
> {{AggregateOperator.AggregatingUdf}} would use these {{FieldAccessors}} 
> instead of the Tuple field access methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4575) DataSet aggregate methods should support POJOs

2016-09-04 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4575:
--

 Summary: DataSet aggregate methods should support POJOs
 Key: FLINK-4575
 URL: https://issues.apache.org/jira/browse/FLINK-4575
 Project: Flink
  Issue Type: Improvement
  Components: DataSet API
Reporter: Gabor Gevay
Priority: Minor


The aggregate methods of DataSets (aggregate, sum, min, max) currently only 
support Tuples, with the fields specified by indices. With 
https://issues.apache.org/jira/browse/FLINK-3702 resolved, adding support for 
POJOs and field expressions would be easy: {{AggregateOperator}} would create 
{{FieldAccessors}} instead of just storing field positions, and 
{{AggregateOperator.AggregatingUdf}} would use these {{FieldAccessors}} instead 
of the Tuple field access methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2016-08-31 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2662:
---
Priority: Critical  (was: Major)

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-08-26 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438599#comment-15438599
 ] 

Gabor Gevay commented on FLINK-3322:


Yes, I think Stephan meant that you would be mainly working on this, and I help 
by shepherding the pull request, discussing in the design doc, etc.

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-08-25 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437410#comment-15437410
 ] 

Gabor Gevay commented on FLINK-3322:


Yes, I volunteer to help :)

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-08-25 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436605#comment-15436605
 ] 

Gabor Gevay edited comment on FLINK-3322 at 8/25/16 10:05 AM:
--

I think it would be very good to finally solve this issue! Iteration is 
supposed to be a strong point of Flink, but this issue can seriously hurt the 
performance of some iterative jobs, where there are many iteration steps, but 
little work in a single step (which can easily happen towards the end of delta 
iterations).


was (Author: ggevay):
I think it would be very good to finally solve this issue! Iteration is 
supposed to be a strong point of Flink, but this issue can seriously hurt the 
performance of some iterative jobs, where there are many iterations, but little 
work in each iteration (which can easily happen towards the end of delta 
iterations).

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-08-25 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436605#comment-15436605
 ] 

Gabor Gevay commented on FLINK-3322:


I think it would be very good to finally solve this issue! Iteration is 
supposed to be a strong point of Flink, but this issue can seriously hurt the 
performance of some iterative jobs, where there are many iterations, but little 
work in each iteration (which can easily happen towards the end of delta 
iterations).

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-08-24 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3999:
---
Assignee: Neelesh Srinivas Salian

> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-08-24 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435446#comment-15435446
 ] 

Gabor Gevay commented on FLINK-3999:


Yes, thank you for working on it!

> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2016-08-21 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3722:
---
Assignee: (was: Gabor Gevay)

> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-08-21 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay resolved FLINK-3291.

Resolution: Fixed

> Object reuse bug in MergeIterator.HeadStream.nextHead
> -
>
> Key: FLINK-3291
> URL: https://issues.apache.org/jira/browse/FLINK-3291
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Critical
>
> MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
> `reuse` object that it got as an argument. This object might be modified 
> later by the caller.
> This actually happens when ReduceDriver.run calls input.next (which will 
> actually be MergeIterator.next(E reuse)) in the inner while loop of the 
> objectReuseEnabled branch, and that calls top.nextHead with the reference 
> that it got from ReduceDriver, which erroneously saves the reference, and 
> then ReduceDriver later uses that same object for doing the reduce.
> Another way in which this fails is when MergeIterator.next(E reuse) gives 
> `reuse` to different `top`s in different calls, and then the heads end up 
> being the same object.
> You can observe the latter situation in action by running ReducePerformance 
> here:
> https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
> Set memory to -Xmx200m (so that the MergeIterator actually has merging to 
> do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then 
> watch `reuse`, and the heads of the first two elements of `this.heap` in the 
> debugger. They will get to be the same object after hitting continue about 6 
> times.
> You can also look at the count that is printed at the end, which shouldn't be 
> larger than the key range. Also, if you look into the output file 
> /tmp/xxxobjectreusebug, for example the key 77 appears twice.
> The good news is that I think I can see an easy fix that doesn't affect 
> performance: MergeIterator.HeadStream could have a reuse object of its own as 
> a member, and give that to iterator.next in nextHead(E reuse). And then we 
> wouldn't need the overload of nextHead that has the reuse parameter, and 
> MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-08-21 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3322:
---
Assignee: (was: Gabor Horvath)

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-08-21 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3322:
---
Component/s: (was: Distributed Coordination)
 Local Runtime

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Horvath
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3394) Clear up the contract of MutableObjectIterator.next(reuse)

2016-08-21 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3394:
---
Component/s: (was: Distributed Coordination)
 Local Runtime

> Clear up the contract of MutableObjectIterator.next(reuse)
> --
>
> Key: FLINK-3394
> URL: https://issues.apache.org/jira/browse/FLINK-3394
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> {{MutableObjectIterator.next(reuse)}} has the following contract (according 
> to [~StephanEwen]'s comment \[1\]):
> 1. The caller may not hold onto {{reuse}} any more
> 2. The iterator implementor may not hold onto the returned object any more.
> This should be documented in its javadoc (with "WARNING" so that people don't 
> overlook it).
> Additionally, since this was a "secret contract" up to now, all the 270 
> usages of {{MutableObjectIterator.next(reuse)}} should be checked for 
> violations. A few that are suspicious at first glance, are in 
> {{CrossDriver}}, {{UnionWithTempOperator}}, 
> {{MutableHashTable.ProbeIterator.next}}, 
> {{ReusingBuildFirstHashJoinIterator.callWithNextKey}}. (The violating calls 
> in the reduce drivers are being fixed by 
> https://github.com/apache/flink/pull/1626 )
> \[1\] 
> https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15144654



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3279) Optionally disable DistinctOperator combiner

2016-07-15 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15379599#comment-15379599
 ] 

Gabor Gevay edited comment on FLINK-3279 at 7/15/16 4:00 PM:
-

I think no. And a Jira is also needed for the sum, max, etc. aggregations. 
(Maybe these two things can be in one Jira.)


was (Author: ggevay):
https://issues.apache.org/jira/browse/FLINK-3479?

> Optionally disable DistinctOperator combiner
> 
>
> Key: FLINK-3279
> URL: https://issues.apache.org/jira/browse/FLINK-3279
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Calling {{DataSet.distinct()}} executes {{DistinctOperator.DistinctFunction}} 
> which is a combinable {{RichGroupReduceFunction}}. Sometimes we know that 
> there will be few duplicate records and disabling the combine would improve 
> performance.
> I propose adding {{public DistinctOperator setCombinable(boolean 
> combinable)}} to {{DistinctOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3279) Optionally disable DistinctOperator combiner

2016-07-15 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15379599#comment-15379599
 ] 

Gabor Gevay commented on FLINK-3279:


https://issues.apache.org/jira/browse/FLINK-3479?

> Optionally disable DistinctOperator combiner
> 
>
> Key: FLINK-3279
> URL: https://issues.apache.org/jira/browse/FLINK-3279
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Calling {{DataSet.distinct()}} executes {{DistinctOperator.DistinctFunction}} 
> which is a combinable {{RichGroupReduceFunction}}. Sometimes we know that 
> there will be few duplicate records and disabling the combine would improve 
> performance.
> I propose adding {{public DistinctOperator setCombinable(boolean 
> combinable)}} to {{DistinctOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4029) Multi-field "sum" function just like "keyBy"

2016-07-08 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367631#comment-15367631
 ] 

Gabor Gevay commented on FLINK-4029:


You might want to reuse the FieldAccessor infrastructure that is in this PR:
https://github.com/apache/flink/pull/2094

> Multi-field "sum" function just like "keyBy"
> 
>
> Key: FLINK-4029
> URL: https://issues.apache.org/jira/browse/FLINK-4029
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Rami
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> I can use keyBy as follows:
> stream.keyBy(ā€œpojo.field1ā€,ā€pojo.field2ā€,ā€¦)
> Would make sense that I can use sum for example, to do its job for more than 
> one field:
> stream.sum(ā€œpojo.field1ā€,ā€pojo.field2ā€,ā€¦)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1730) Add a FlinkTools.persist style method to the Data Set.

2016-06-03 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-1730:
---
Priority: Major  (was: Minor)

> Add a FlinkTools.persist style method to the Data Set.
> --
>
> Key: FLINK-1730
> URL: https://issues.apache.org/jira/browse/FLINK-1730
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stephan Ewen
>
> I think this is an operation that will be needed more prominently. Defining a 
> point where one long logical program is broken into different executions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-05-31 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3999:
--

 Summary: Rename the `running` flag in the drivers to `canceled`
 Key: FLINK-3999
 URL: https://issues.apache.org/jira/browse/FLINK-3999
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Trivial


The name of the {{running}} flag in the drivers doesn't reflect its usage: when 
the operator just stops normally, then it is not running anymore, but the 
{{running}}  flag will still be true, since the {{running}} flag is only set 
when cancelling.

It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3291) Object reuse bug in MergeIterator.HeadStream.nextHead

2016-05-27 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303994#comment-15303994
 ] 

Gabor Gevay commented on FLINK-3291:


This bug occurred when object reuse is enabled. This is disabled by default, 
and can be enabled by calling ExecutionConfig.enableObjectReuse. I don't know 
Beam, but I would guess that this option is also not enabled there by default.

> Object reuse bug in MergeIterator.HeadStream.nextHead
> -
>
> Key: FLINK-3291
> URL: https://issues.apache.org/jira/browse/FLINK-3291
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Critical
>
> MergeIterator.HeadStream.nextHead saves a reference into `this.head` of the 
> `reuse` object that it got as an argument. This object might be modified 
> later by the caller.
> This actually happens when ReduceDriver.run calls input.next (which will 
> actually be MergeIterator.next(E reuse)) in the inner while loop of the 
> objectReuseEnabled branch, and that calls top.nextHead with the reference 
> that it got from ReduceDriver, which erroneously saves the reference, and 
> then ReduceDriver later uses that same object for doing the reduce.
> Another way in which this fails is when MergeIterator.next(E reuse) gives 
> `reuse` to different `top`s in different calls, and then the heads end up 
> being the same object.
> You can observe the latter situation in action by running ReducePerformance 
> here:
> https://github.com/ggevay/flink/tree/merge-iterator-object-reuse-bug
> Set memory to -Xmx200m (so that the MergeIterator actually has merging to 
> do), put a breakpoint at the beginning of MergeIterator.next(reuse), and then 
> watch `reuse`, and the heads of the first two elements of `this.heap` in the 
> debugger. They will get to be the same object after hitting continue about 6 
> times.
> You can also look at the count that is printed at the end, which shouldn't be 
> larger than the key range. Also, if you look into the output file 
> /tmp/xxxobjectreusebug, for example the key 77 appears twice.
> The good news is that I think I can see an easy fix that doesn't affect 
> performance: MergeIterator.HeadStream could have a reuse object of its own as 
> a member, and give that to iterator.next in nextHead(E reuse). And then we 
> wouldn't need the overload of nextHead that has the reuse parameter, and 
> MergeIterator.next(E reuse) could just call its other overload.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-05-22 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295698#comment-15295698
 ] 

Gabor Gevay commented on FLINK-3702:


This turned out to be more complicated than I thought, but I'm 80% done. I'll 
try to wrap it up in the next few days.

I'm refactoring FieldAccessor, so that it will be created from a method of the 
TypeInfos, instead of the static method FieldAccessor.create. This makes it 
possible to process the outermost part of a field expression and then recurse 
into the specified field's TypeInfo with the rest of the field expression.

> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-05-22 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reassigned FLINK-3702:
--

Assignee: Gabor Gevay

> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-05-22 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15295621#comment-15295621
 ] 

Gabor Gevay commented on FLINK-3702:


I'll try to quickly do it now.

> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2144) Incremental count, average, and variance for windows

2016-05-20 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15293629#comment-15293629
 ] 

Gabor Gevay commented on FLINK-2144:


But why couldn't we provide FoldFunctions for these in the API?

> Incremental count, average, and variance for windows
> 
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-19 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291006#comment-15291006
 ] 

Gabor Gevay commented on FLINK-2147:


> From a first look, something like StreamGroupedFold would be enough right?

Sorry, I'm not sure. I suggest you ask on the mailing list, and then probably 
someone who knows streaming better than me will respond. Unfortunately I don't 
have enough time now to delve deep into this.

By the way, maybe you could start with this Jira: 
https://issues.apache.org/jira/browse/FLINK-2144
There are some similarities to this one, but it is more straightforward to 
implement.

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>  Labels: approximate, statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15284479#comment-15284479
 ] 

Gabor Gevay commented on FLINK-2147:


Yes. We should probably look in the direction of FoldFunction, since Flink does 
preaggregation for these, if I'm not mistaken.

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>  Labels: approximate, statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2144) Incremental count, average, and variance for windows

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2144:
---
Summary: Incremental count, average, and variance for windows  (was: 
Implement incremental count, average, and variance for windows)

> Incremental count, average, and variance for windows
> 
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2142) GSoC project: Exact and Approximate Statistics for Data Streams and Windows

2016-05-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15284474#comment-15284474
 ] 

Gabor Gevay commented on FLINK-2142:


(I've also broken off FLINK-2144.)

> GSoC project: Exact and Approximate Statistics for Data Streams and Windows
> ---
>
> Key: FLINK-2142
> URL: https://issues.apache.org/jira/browse/FLINK-2142
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: gsoc2015, statistics, streaming
>
> The goal of this project is to implement basic statistics of data streams and 
> windows (like average, median, variance, correlation, etc.) in a 
> computationally efficient manner. This involves designing custom PreReducers.
> The exact calculation of some statistics (eg. frequencies, or the number of 
> distinct elements) would require memory proportional to the number of 
> elements in the input (the window or the entire stream). However, there are 
> efficient algorithms and data structures using less memory for calculating 
> the same statistics only approximately, with user-specified error bounds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2144) Implement incremental count, average, and variance for windows

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2144:
---
Summary: Implement incremental count, average, and variance for windows  
(was: Implement count, average, and variance for windows)

> Implement incremental count, average, and variance for windows
> --
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2144) Implement incremental count, average, and variance for windows

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2144:
---
Assignee: (was: Gabor Gevay)

> Implement incremental count, average, and variance for windows
> --
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2144) Implement count, average, and variance for windows

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2144:
---
Issue Type: New Feature  (was: Sub-task)
Parent: (was: FLINK-2142)

> Implement count, average, and variance for windows
> --
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2144) Implement count, average, and variance for windows

2016-05-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15284473#comment-15284473
 ] 

Gabor Gevay commented on FLINK-2144:


Originally, I wanted to make this calculation incremental both on new elements 
and on evictions. With the new windowing API, the eviction part lost its 
importance. However, making it incremental for new elements is still valid, as 
this would allow making these calculations without buffering up the window. I 
think this essentially means that these should be implemented in FoldFunctions, 
which should be automatically pre-aggregated by Flink.

> Implement count, average, and variance for windows
> --
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15284468#comment-15284468
 ] 

Gabor Gevay commented on FLINK-2147:


(Note that the out-of-order problem that I mentioned for the incremental median 
stuff was about the situation that I wanted to re-use some part of the 
calculations across different windows (hence the word "incremental"). This is 
probably not necessary here, we can just work on separate windows separately.)

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>  Labels: approximate, statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15284467#comment-15284467
 ] 

Gabor Gevay commented on FLINK-2147:


In my opinion, the semantics would be to calculate the statistic only about 
each window separately. When to emit is handled by the triggers (as with other 
windowing calculations in Flink.) (Note that the windows can be quite large, 
like weekly or monthly.)

I think that having a statistic about the entire stream is rarely what the user 
actually wants. Flink programs are designed to run indefinitely for a long 
time, and the starting point of a stream is just when the user happened to 
start the Flink program, which might have no real semantic meaning if the Flink 
program is analyzing some external system.

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>  Labels: approximate, statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15284437#comment-15284437
 ] 

Gabor Gevay commented on FLINK-2147:


I'm actually now closing the incremental median and similar stuff, as it was 
based on the assumption of the old (pre-0.10) windowing API that events come in 
ordered by time, so it doesn't fit into the new API. (See 
https://github.com/apache/flink/pull/684#issuecomment-195402038)

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>  Labels: approximate, statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2142) GSoC project: Exact and Approximate Statistics for Data Streams and Windows

2016-05-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15284435#comment-15284435
 ] 

Gabor Gevay commented on FLINK-2142:


This proposal was based on the old (pre-0.10) windowing API. I'm now taking it 
apart, by converting sub-tasks to stand-alone issues (FLINK-2148, FLINK-2147) 
and/or modifying/closing those sub-tasks that don't make sense in the current 
streaming API. I will add the label `approximate` to those issues that are 
about approximate calculations.

Note: The main reason why I abandoned this project last summer, is that the 
streaming API was changing a lot at that time, so it seemed better to postpone 
these things.

> GSoC project: Exact and Approximate Statistics for Data Streams and Windows
> ---
>
> Key: FLINK-2142
> URL: https://issues.apache.org/jira/browse/FLINK-2142
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: gsoc2015, statistics, streaming
>
> The goal of this project is to implement basic statistics of data streams and 
> windows (like average, median, variance, correlation, etc.) in a 
> computationally efficient manner. This involves designing custom PreReducers.
> The exact calculation of some statistics (eg. frequencies, or the number of 
> distinct elements) would require memory proportional to the number of 
> elements in the input (the window or the entire stream). However, there are 
> efficient algorithms and data structures using less memory for calculating 
> the same statistics only approximately, with user-specified error bounds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2145) Median calculation for windows

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay resolved FLINK-2145.

Resolution: Won't Fix

I'm closing this, as it was based on assumptions of the old (pre-0.10) 
windowing. See https://github.com/apache/flink/pull/684#issuecomment-195402038

> Median calculation for windows
> --
>
> Key: FLINK-2145
> URL: https://issues.apache.org/jira/browse/FLINK-2145
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> The PreReducer for this has the following algorithm: We maintain two 
> multisets (as, for example, balanced binary search trees), that always 
> partition the elements of the current window to smaller-than-median and 
> larger-than-median elements. At each store and evict, we can maintain this 
> invariant with only O(1) multiset operations.
> Store: O(log N)
> Evict: O(log N)
> emitWindow: O(1)
> memory: O(N)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15284427#comment-15284427
 ] 

Gabor Gevay commented on FLINK-2147:


OK, we can certainly think about it.

It would probably make sense to apply this to windows, and not the entire 
stream. In this case, we have to figure out how to apply the algorithm already 
when Flink is building the windows. (I mean we can't just stuff the algorithm 
into a WindowFunction, because then Flink would buffer up all the elements of a 
window, which defeats the purpose.) Maybe it can be done with a fold, since I 
think that Flink is already doing preaggregation for folds.

Another issue is how do we want the user to specify the key? I mean it would 
probably not be elegant to apply this to the entire elements, so maybe the user 
should specify a field on which to apply the algorithm. But then maybe it would 
be good if we could fit this with keyBy, by maybe adding this as a method of 
KeyedStream? I'm not sure.

Note that, for example [~aljoscha] might have a more informed opinion on all 
this.

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>  Labels: approximate, statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2147:
---
Labels: approximate statistics  (was: statistics)

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>  Labels: approximate, statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2148) Approximately calculate the number of distinct elements of a stream

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2148:
---
Labels: approximate statistics  (was: statistics)

> Approximately calculate the number of distinct elements of a stream
> ---
>
> Key: FLINK-2148
> URL: https://issues.apache.org/jira/browse/FLINK-2148
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: approximate, statistics
>
> In the paper
> http://people.seas.harvard.edu/~minilek/papers/f0.pdf
> Kane et al. describes an optimal algorithm for estimating the number of 
> distinct elements in a data stream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2146) Fast calculation of min/max with arbitrary eviction and triggers

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay resolved FLINK-2146.

Resolution: Not A Problem

I'm closing this, since with the redesigned windowing API (since 0.10) this 
became much less important. The previous windowing worked by always keeping 
track of one window only, by adding newly arriving elements and evicting old 
elements, and it fitted all kinds of windowing strategies into this model. This 
issue concerned the updating of min/max when the window is updated this way. 
However, the new windowing API (mostly to handle out of order events) keeps 
track of several windows at the same time, so solving this issue wouldn't 
effect sliding windows for example.

> Fast calculation of min/max with arbitrary eviction and triggers
> 
>
> Key: FLINK-2146
> URL: https://issues.apache.org/jira/browse/FLINK-2146
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>
> The last algorithm described here could be used:
> http://codercareer.blogspot.com/2012/02/no-33-maximums-in-sliding-windows.html
> It is based on a double-ended queue which maintains a sorted list of elements 
> of the current window that have the possibility of being the maximal element 
> in the future.
> Store: O(1) amortized
> Evict: O(1)
> emitWindow: O(1)
> memory: O(N)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2148) Approximately calculate the number of distinct elements of a stream

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2148:
---
Issue Type: New Feature  (was: Sub-task)
Parent: (was: FLINK-2142)

> Approximately calculate the number of distinct elements of a stream
> ---
>
> Key: FLINK-2148
> URL: https://issues.apache.org/jira/browse/FLINK-2148
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> In the paper
> http://people.seas.harvard.edu/~minilek/papers/f0.pdf
> Kane et al. describes an optimal algorithm for estimating the number of 
> distinct elements in a data stream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2147:
---
Priority: Major  (was: Minor)

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>  Labels: statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2147:
---
Issue Type: New Feature  (was: Sub-task)
Parent: (was: FLINK-2142)

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15284406#comment-15284406
 ] 

Gabor Gevay commented on FLINK-2147:


Yes, I think it would be very good if you worked on this, and starting with a 
design document is a good idea. However, if you haven't yet contributed to 
Flink, then I would suggest starting with some simpler tasks first. Note, that 
the hard part here will not be implementing the algorithm that is described in 
the linked paper, but figuring out how it fits into the API and internals of 
Flink.

(This was a sub-task of my Google Summer of Code project last summer, but then 
I haven't really worked on this, because the streaming API was changing a lot 
at that time. I will convert this sub-task to a stand-alone issue now.)

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-16 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15284312#comment-15284312
 ] 

Gabor Gevay commented on FLINK-2147:


Unfortunately, no.

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3788) Programs extending App trait cause problems because of DelayedInit

2016-05-14 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3788:
---
Summary: Programs extending App trait cause problems because of DelayedInit 
 (was: Local variable values are not distributed to job runners)

> Programs extending App trait cause problems because of DelayedInit
> --
>
> Key: FLINK-3788
> URL: https://issues.apache.org/jira/browse/FLINK-3788
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.0.0, 1.0.1
> Environment: Scala 2.11.8
> Sun JDK 1.8.0_65 or OpenJDK 1.8.0_77
> Fedora 25, 4.6.0-0.rc2.git3.1.fc25.x86_64
>Reporter: Andreas C. Osowski
> Attachments: FLINK-3788.tgz
>
>
> Variable values of non-elementary types aren't caught and distributed to job 
> runners, causing them to remain 'null' and causing NPEs upon access when 
> running on a cluster. Running locally through `flink-clients` works fine.
> Changing parallelism or disabling the closure cleaner don't seem to have any 
> effect.
> Minimal example, also see the attached archive.
> {code:java}
> case class IntWrapper(a1: Int)
> val wrapped = IntWrapper(42)
> env.readTextFile("myTextFile.txt").map(line => wrapped.toString).collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3788) Local variable values are not distributed to job runners

2016-04-19 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247896#comment-15247896
 ] 

Gabor Gevay commented on FLINK-3788:


It seems that the `extends App` stuff is causing this somehow. If I copy this 
code to a regular main method, then it works.

> Local variable values are not distributed to job runners
> 
>
> Key: FLINK-3788
> URL: https://issues.apache.org/jira/browse/FLINK-3788
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.0.0, 1.0.1
> Environment: Scala 2.11.8
> Sun JDK 1.8.0_65 or OpenJDK 1.8.0_77
> Fedora 25, 4.6.0-0.rc2.git3.1.fc25.x86_64
>Reporter: Andreas C. Osowski
> Attachments: FLINK-3788.tgz
>
>
> Variable values of non-elementary types aren't caught and distributed to job 
> runners, causing them to remain 'null' and causing NPEs upon access when 
> running on a cluster. Running locally through `flink-clients` works fine.
> Changing parallelism or disabling the closure cleaner don't seem to have any 
> effect.
> Minimal example, also see the attached archive.
> {code:java}
> case class IntWrapper(a1: Int)
> val wrapped = IntWrapper(42)
> env.readTextFile("myTextFile.txt").map(line => wrapped.toString).collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3479) Add hash-based strategy for CombineFunction

2016-04-15 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3479:
---
Priority: Minor  (was: Major)

> Add hash-based strategy for CombineFunction
> ---
>
> Key: FLINK-3479
> URL: https://issues.apache.org/jira/browse/FLINK-3479
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Priority: Minor
>
> This issue is similar to FLINK-3477 but adds a hash-based strategy for 
> {{CombineFunction}} instead of {{ReduceFunction}}.
> The interface of {{CombineFunction}} differs from {{ReduceFunction}} by 
> providing an {{Iterable}} instead of two {{T}} values. Hence, if the 
> {{Iterable}} provides two values, we can do the same as with a 
> {{ReduceFunction}}.
> At the moment, {{CombineFunction}} is wrapped in a {{GroupCombineFunction}} 
> and hence executed using the {{GroupReduceCombineDriver}}. 
> We should add dedicated two dedicated drivers: {{CombineDriver}} and 
> {{ChainedCombineDriver}} and two driver strategies: {{HASH_COMBINE}} and 
> {{SORT_COMBINE}}. 
> If FLINK-3477 is resolved, we can reuse the hash-table.
> We should also add compiler hints to `DataSet.reduceGroup()` and 
> `Grouping.reduceGroup()` to allow users to select between a {{SORT}} and 
> {{HASH}} based combine strategies ({{HASH}} will only be applicable to 
> {{CombineFunction}} and not {{GroupCombineFunction}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2016-04-09 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3722:
--

 Summary: The divisions in the InMemorySorters' swap/compare 
methods hurt performance
 Key: FLINK-3722
 URL: https://issues.apache.org/jira/browse/FLINK-3722
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Gabor Gevay
Priority: Minor


NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
use divisions (which take a lot of time \[1\]) to calculate the index of the 
MemorySegment and the offset inside the segment. [~greghogan] reported on the 
mailing list \[2\] measuring a ~12-14% performance effect in one case.

A possibility to improve the situation is the following:
The way that QuickSort mostly uses these compare and swap methods is that it 
maintains two indices, and uses them to call compare and swap. The key 
observation is that these indices are mostly stepped by one, and 
_incrementally_ calculating the quotient and modulo is actually easy when the 
index changes only by one: increment/decrement the modulo, and check whether 
the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
modulo and increment/decrement the quotient.

To implement this, InMemorySorter would have to expose an iterator that would 
have the divisor and the current modulo and quotient as state, and have 
increment/decrement methods. Compare and swap could then have overloads that 
take these iterators as arguments.

\[1\] http://www.agner.org/optimize/instruction_tables.pdf
\[2\] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-04-08 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reassigned FLINK-3477:
--

Assignee: Gabor Gevay

> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-02-26 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15168845#comment-15168845
 ] 

Gabor Gevay commented on FLINK-3322:


Actually, these are separate runs, so all of them are initial runs.

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3519) Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant

2016-02-26 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15168836#comment-15168836
 ] 

Gabor Gevay commented on FLINK-3519:


OK, I have found a good reason for the Gelly style subclassing: the 
getField(int pos), setField(int pos) methods are actually handy sometimes.

Then I'll create a PR with (2).

> Subclasses of Tuples don't work if the declared type of a DataSet is not the 
> descendant
> ---
>
> Key: FLINK-3519
> URL: https://issues.apache.org/jira/browse/FLINK-3519
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Minor
>
> If I have a subclass of TupleN, then objects of this type will turn into 
> TupleNs when I try to use them in a DataSet.
> For example, if I have a class like this:
> {code}
> public static class Foo extends Tuple1 {
>   public short a;
>   public Foo() {}
>   public Foo(int f0, int a) {
>   this.f0 = f0;
>   this.a = (short)a;
>   }
>   @Override
>   public String toString() {
>   return "(" + f0 + ", " + a + ")";
>   }
> }
> {code}
> And then I do this:
> {code}
> env.fromElements(0,0,0).map(new MapFunction() {
>   @Override
>   public Tuple1 map(Integer value) throws Exception {
>   return new Foo(5, 6);
>   }
> }).print();
> {code}
> Then I don't have Foos in the output, but only Tuples:
> {code}
> (5)
> (5)
> (5)
> {code}
> The problem is caused by the TupleSerializer not caring about subclasses at 
> all. I guess the reason for this is performance: we don't want to deal with 
> writing and reading subclass tags when we have Tuples.
> I see three options for solving this:
> 1. Add subclass tags to the TupleSerializer: This is not really an option, 
> because we don't want to loose performance.
> 2. Document this behavior in the javadoc of the Tuple classes.
> 3. Make the Tuple types final: this would be the clean solution, but it is 
> API breaking, and the first victim would be Gelly: the Vertex and Edge types 
> extend from tuples. (Note that the issue doesn't appear there, because the 
> DataSets there always have the type of the descendant class.)
> When deciding between 2. and 3., an important point to note is that if you 
> have your class extend from a Tuple type instead of just adding the f0, f1, 
> ... fields manually in the hopes of getting the performance boost associated 
> with Tuples, then you are out of luck: the PojoSerializer will kick in anyway 
> when the declared types of your DataSets are the descendant type.
> If someone knows about a good reason to extend from a Tuple class, then 
> please comment.
> For 2., this is a suggested wording for the javadoc of the Tuple classes:
> Warning: Please don't subclass Tuple classes, but if you do, then be sure to 
> always declare the element type of your DataSets to your descendant type. 
> (That is, if you have a "class A extends Tuple2", then don't use instances of 
> A in a DataSet, but use DataSet.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3519) Subclasses of Tuples don't work if the declared type of a DataSet is not the descendant

2016-02-26 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15168812#comment-15168812
 ] 

Gabor Gevay commented on FLINK-3519:


But if you were aware of this problem, then what was the original reason for 
not making them final?

> Subclasses of Tuples don't work if the declared type of a DataSet is not the 
> descendant
> ---
>
> Key: FLINK-3519
> URL: https://issues.apache.org/jira/browse/FLINK-3519
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Minor
>
> If I have a subclass of TupleN, then objects of this type will turn into 
> TupleNs when I try to use them in a DataSet.
> For example, if I have a class like this:
> {code}
> public static class Foo extends Tuple1 {
>   public short a;
>   public Foo() {}
>   public Foo(int f0, int a) {
>   this.f0 = f0;
>   this.a = (short)a;
>   }
>   @Override
>   public String toString() {
>   return "(" + f0 + ", " + a + ")";
>   }
> }
> {code}
> And then I do this:
> {code}
> env.fromElements(0,0,0).map(new MapFunction() {
>   @Override
>   public Tuple1 map(Integer value) throws Exception {
>   return new Foo(5, 6);
>   }
> }).print();
> {code}
> Then I don't have Foos in the output, but only Tuples:
> {code}
> (5)
> (5)
> (5)
> {code}
> The problem is caused by the TupleSerializer not caring about subclasses at 
> all. I guess the reason for this is performance: we don't want to deal with 
> writing and reading subclass tags when we have Tuples.
> I see three options for solving this:
> 1. Add subclass tags to the TupleSerializer: This is not really an option, 
> because we don't want to loose performance.
> 2. Document this behavior in the javadoc of the Tuple classes.
> 3. Make the Tuple types final: this would be the clean solution, but it is 
> API breaking, and the first victim would be Gelly: the Vertex and Edge types 
> extend from tuples. (Note that the issue doesn't appear there, because the 
> DataSets there always have the type of the descendant class.)
> When deciding between 2. and 3., an important point to note is that if you 
> have your class extend from a Tuple type instead of just adding the f0, f1, 
> ... fields manually in the hopes of getting the performance boost associated 
> with Tuples, then you are out of luck: the PojoSerializer will kick in anyway 
> when the declared types of your DataSets are the descendant type.
> If someone knows about a good reason to extend from a Tuple class, then 
> please comment.
> For 2., this is a suggested wording for the javadoc of the Tuple classes:
> Warning: Please don't subclass Tuple classes, but if you do, then be sure to 
> always declare the element type of your DataSets to your descendant type. 
> (That is, if you have a "class A extends Tuple2", then don't use instances of 
> A in a DataSet, but use DataSet.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >