[jira] [Commented] (FLINK-3420) Remove "ReadTextFileWithValue" from StreamExecutionEnvironment

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15150060#comment-15150060
 ] 

ASF GitHub Bot commented on FLINK-3420:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1648#issuecomment-185083603
  
+1


> Remove "ReadTextFileWithValue" from StreamExecutionEnvironment
> --
>
> Key: FLINK-3420
> URL: https://issues.apache.org/jira/browse/FLINK-3420
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> These methods are too specific of very limited use in streaming (highly 
> special case of a bounded data set).
> They bloat the API, and lock the development into keeping supporting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3379) Refactor TimestampExtractor

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15150059#comment-15150059
 ] 

ASF GitHub Bot commented on FLINK-3379:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1646#issuecomment-185083361
  
+1 This should go into 1.0


> Refactor TimestampExtractor
> ---
>
> Key: FLINK-3379
> URL: https://issues.apache.org/jira/browse/FLINK-3379
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.0.0
>
>
> Based on a lot of user feedback, the current {{TimestampExtractor}} seems 
> very confusing. It implements simultaneously two modes of generating 
> watermarks:
>   - Each record that passes through can decide to cause a watermark.
>   - The timestamp extractor can define a certain watermark timestamp which is 
> periodically picked up by the system and triggers a watermark (if larger than 
> the previous watermark).
> Figuring out how these modes interplay, and how to define the methods to only 
> use one mode has been quite an obstacle for several users. We should break 
> this class into two different classes, one per mode of generating watermarks, 
> to make it easier to understand.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3422) Scramble HashPartitioner hashes

2016-02-16 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15150058#comment-15150058
 ] 

Aljoscha Krettek commented on FLINK-3422:
-

This is a breaking change, though. Some users might have already developed 
stuff that depends on the hash function to be as it is. For example, there is 
this custom akka state query thingy.

> Scramble HashPartitioner hashes
> ---
>
> Key: FLINK-3422
> URL: https://issues.apache.org/jira/browse/FLINK-3422
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
>Priority: Critical
> Fix For: 1.0.0
>
>
> The {{HashPartitioner}} used by the streaming API does not apply any hash 
> scrambling against bad user hash functions.
> We should apply a murmor or jenkins hash on top of the hash code, similar as 
> in the {{DataSet}} API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream

2016-02-16 Thread Chiwan Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149948#comment-15149948
 ] 

Chiwan Park commented on FLINK-3412:


+1 for [~till.rohrmann]'s idea.

> Remove implicit conversions JavaStream / ScalaStream
> 
>
> Key: FLINK-3412
> URL: https://issues.apache.org/jira/browse/FLINK-3412
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> I think the implicit conversions between the Java DataStream and the Scala 
> DataStream are dangerous.
> Because conversions exist in both directions, it is possible to write methods 
> that look like calling functions on the JavaStream, but instead convert it to 
> a Scala stream and call a different method.
> I just accidentally implemented an infinite recursion that way (via two 
> hidden implicit conversions).
> Making the conversions explicit (with a {{wrap()}} function like in the batch 
> API, we add minimally more code internally (nothing is different for users), 
> but avoid such accidental errors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3179) Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149868#comment-15149868
 ] 

ASF GitHub Bot commented on FLINK-3179:
---

Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-185025591
  
A new push request has been submitted. JYFI @fhueske .


> Combiner is not injected if Reduce or GroupReduce input is explicitly 
> partitioned
> -
>
> Key: FLINK-3179
> URL: https://issues.apache.org/jira/browse/FLINK-3179
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.10.1
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or 
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet> words = ...
> DataSet> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of 
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the 
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such 
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the 
> input of a Reduce or GroupReduce operator. This should only happen, if the 
> Reducer is the only successor of the Partition operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3179 Combiner is not injected if Reduce ...

2016-02-16 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1553#issuecomment-185025591
  
A new push request has been submitted. JYFI @fhueske .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3419) Drop partitionByHash from DataStream

2016-02-16 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149411#comment-15149411
 ] 

Vasia Kalavri commented on FLINK-3419:
--

Hey,
is it really necessary to drop this one? We are using it in gelly streaming and 
I think it's quite useful for certain streaming graph algorithms.
The thing is we want to keep state per partition, not by key. For example, we 
partition by vertexID and thus we can merge the state of several vertices into 
one within the same partition. If we do this with a keyBy, then the state grows 
too big for each vertex (we end up storing duplicates). Can we reconsider 
dropping it or is there any other way we can have the same behavior with keyBy?

> Drop partitionByHash from DataStream
> 
>
> Key: FLINK-3419
> URL: https://issues.apache.org/jira/browse/FLINK-3419
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Stephan Ewen
>Priority: Blocker
>
> The behavior is no different from {{keyBy}}, except that you cannot use keyed 
> state and windows if you use {{partitionByHash}} so I suggest to drop it.
> We might also want to think about dropping {{shuffle}} and {{global}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3400) RocksDB Backend does not work when not in Flink lib folder

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149394#comment-15149394
 ] 

ASF GitHub Bot commented on FLINK-3400:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1644#issuecomment-184891874
  
Looks good, +1 to merge this


> RocksDB Backend does not work when not in Flink lib folder
> --
>
> Key: FLINK-3400
> URL: https://issues.apache.org/jira/browse/FLINK-3400
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The problem is that the external process runner cannot load the 
> {{HDFSCopyFromLocal}} and {{HDFSCopyToLocal}} classes when the rocksdb 
> backend jar is not in the Flink lib folder. If rocksdb is only in the 
> user-code jar the code cannot be loaded when trying to start the external 
> copy runners.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3420) Remove "ReadTextFileWithValue" from StreamExecutionEnvironment

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149391#comment-15149391
 ] 

ASF GitHub Bot commented on FLINK-3420:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1648

[FLINK-3420] [api-breaking] Remove utility functions from 
StreamExecutionEnvironment

This removes the functions  `readTextFileWithValue(...)` and 
`readFileOfPrimitives(...)` from `StreamExecutionEnvironment`.

These methods are highly specific for very niche cases of bounded data 
stream processing.
As such, the disadvantages (bloat and lock the API, lock the development 
into support) outweigh the benefit.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
remove_value_primitives

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1648.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1648


commit 152b1187ccfa343da9e2b299931f4d79404cb137
Author: Stephan Ewen 
Date:   2016-02-16T21:54:53Z

[FLINK-3420] [api-breaking] Remove utility functions 
'readTextFileWithValue' and 'readFileOfPrimitives' from 
StreamExecutionEnvironment

These methods are highly specific for very niche cases of bounded data 
stream processing.
As such, the disadvantages (bloat and lock the API, lock the development 
into support) outweigh the benefit.




> Remove "ReadTextFileWithValue" from StreamExecutionEnvironment
> --
>
> Key: FLINK-3420
> URL: https://issues.apache.org/jira/browse/FLINK-3420
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> These methods are too specific of very limited use in streaming (highly 
> special case of a bounded data set).
> They bloat the API, and lock the development into keeping supporting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3400] Move RocksDB Copy Utils to flink-...

2016-02-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1644#issuecomment-184891874
  
Looks good, +1 to merge this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3420] [api-breaking] Remove utility fun...

2016-02-16 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1648

[FLINK-3420] [api-breaking] Remove utility functions from 
StreamExecutionEnvironment

This removes the functions  `readTextFileWithValue(...)` and 
`readFileOfPrimitives(...)` from `StreamExecutionEnvironment`.

These methods are highly specific for very niche cases of bounded data 
stream processing.
As such, the disadvantages (bloat and lock the API, lock the development 
into support) outweigh the benefit.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
remove_value_primitives

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1648.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1648


commit 152b1187ccfa343da9e2b299931f4d79404cb137
Author: Stephan Ewen 
Date:   2016-02-16T21:54:53Z

[FLINK-3420] [api-breaking] Remove utility functions 
'readTextFileWithValue' and 'readFileOfPrimitives' from 
StreamExecutionEnvironment

These methods are highly specific for very niche cases of bounded data 
stream processing.
As such, the disadvantages (bloat and lock the API, lock the development 
into support) outweigh the benefit.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3395) Polishing the web UI

2016-02-16 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3395:
--
Description: 
On the job properties page one must select an operator from the plan. Elsewhere 
in the UI a list of operators is displayed and clicking the table or the plan 
will reveal the requested information.

A list of operators could likewise be added to the timeline page.

The job exceptions page should display a "No exceptions" notification as done 
elsewhere for when there is nothing to display.

In the job plan page, selecting accumulators, checkpoints, or back pressure 
deselects "Plan" from the upper tab, removing the underline and emphasis.

The job plan is not redrawn when the browser window is resized.

  was:
On the job properties page one must select an operator from the plan. Elsewhere 
in the UI a list of operators is displayed and clicking the table or the plan 
will reveal the requested information.

A list of operators could likewise be added to the timeline page. Also, when 
selecting an operator the subtask timelines zoom out to show "000" for 
milliseconds. It would be nice to back off to seconds, minutes, etc.

The job exceptions page should display a "No exceptions" notification as done 
elsewhere for when there is nothing to display.

In the job plan page, selecting accumulators, checkpoints, or back pressure 
deselects "Plan" from the upper tab, removing the underline and emphasis.

The job plan is not redrawn when the browser window is resized.


> Polishing the web UI
> 
>
> Key: FLINK-3395
> URL: https://issues.apache.org/jira/browse/FLINK-3395
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.0.0
>
>
> On the job properties page one must select an operator from the plan. 
> Elsewhere in the UI a list of operators is displayed and clicking the table 
> or the plan will reveal the requested information.
> A list of operators could likewise be added to the timeline page.
> The job exceptions page should display a "No exceptions" notification as done 
> elsewhere for when there is nothing to display.
> In the job plan page, selecting accumulators, checkpoints, or back pressure 
> deselects "Plan" from the upper tab, removing the underline and emphasis.
> The job plan is not redrawn when the browser window is resized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3395) Polishing the web UI

2016-02-16 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan reassigned FLINK-3395:
-

Assignee: Greg Hogan

> Polishing the web UI
> 
>
> Key: FLINK-3395
> URL: https://issues.apache.org/jira/browse/FLINK-3395
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.0.0
>
>
> On the job properties page one must select an operator from the plan. 
> Elsewhere in the UI a list of operators is displayed and clicking the table 
> or the plan will reveal the requested information.
> A list of operators could likewise be added to the timeline page. Also, when 
> selecting an operator the subtask timelines zoom out to show "000" for 
> milliseconds. It would be nice to back off to seconds, minutes, etc.
> The job exceptions page should display a "No exceptions" notification as done 
> elsewhere for when there is nothing to display.
> In the job plan page, selecting accumulators, checkpoints, or back pressure 
> deselects "Plan" from the upper tab, removing the underline and emphasis.
> The job plan is not redrawn when the browser window is resized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3419) Drop partitionByHash from DataStream

2016-02-16 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen reassigned FLINK-3419:
---

Assignee: Stephan Ewen

> Drop partitionByHash from DataStream
> 
>
> Key: FLINK-3419
> URL: https://issues.apache.org/jira/browse/FLINK-3419
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Stephan Ewen
>Priority: Blocker
>
> The behavior is no different from {{keyBy}}, except that you cannot use keyed 
> state and windows if you use {{partitionByHash}} so I suggest to drop it.
> We might also want to think about dropping {{shuffle}} and {{global}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3422) Scramble HashPartitioner hashes

2016-02-16 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3422:
---

 Summary: Scramble HashPartitioner hashes
 Key: FLINK-3422
 URL: https://issues.apache.org/jira/browse/FLINK-3422
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10.2
Reporter: Stephan Ewen
Priority: Critical
 Fix For: 1.0.0


The {{HashPartitioner}} used by the streaming API does not apply any hash 
scrambling against bad user hash functions.

We should apply a murmor or jenkins hash on top of the hash code, similar as in 
the {{DataSet}} API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Refactor Common Parts of Stream/Batch Document...

2016-02-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1645#issuecomment-184876499
  
+1

I like this to get the redundancy out of the batch and streaming guides.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3243) Fix Interplay of TimeCharacteristic and Time Windows

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149295#comment-15149295
 ] 

ASF GitHub Bot commented on FLINK-3243:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1513#issuecomment-184875475
  
Looks good to me, +1 to merge this


> Fix Interplay of TimeCharacteristic and Time Windows
> 
>
> Key: FLINK-3243
> URL: https://issues.apache.org/jira/browse/FLINK-3243
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> As per the discussion on the Dev ML: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Time-Behavior-in-Streaming-Jobs-Event-time-processing-time-td9616.html.
> The discussion seems to have converged on option 2):
> - Add dedicated WindowAssigners for processing time and event time
> - {{timeWindow()}} and {{timeWindowAll()}} respect the set 
> {{TimeCharacteristic}}. 
> This will make the easy stuff easy, i.e. using time windows and quickly 
> switching the time characteristic. Users will then have the flexibility to 
> mix different kinds of window assigners in their job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3243] Fix Interplay of TimeCharacterist...

2016-02-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1513#issuecomment-184875475
  
Looks good to me, +1 to merge this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3401) AscendingTimestampExtractor should not fail on order violation

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149274#comment-15149274
 ] 

ASF GitHub Bot commented on FLINK-3401:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1647

[FLINK-3401] [api breaking] AscendingTimestampExtractor only logs violations

This pull requests is based on #1646 

The AscendingTimestampExtractor only logs violations of ascending timestamp 
order. 

Previously it failed hard on a violation. Since that would be in many cases 
irrecoverable as soon as one order violation happens in the stream, it it 
changed to by default only log violations.

The user can still explicitly set an 'IgnoringHandler' or a 
'FailingHandler', which do nothing on violations,
respectively fail hard as before.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
ascending_extractor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1647.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1647


commit bfe22cf7ea72941056fbbb657094ec75e58c74c4
Author: Stephan Ewen 
Date:   2016-02-16T09:37:59Z

[FLINK-3379] [FLINK-3415] [streaming] Refactor TimestampExtractor into two 
separate classes

 - one class handled periodic watermarks
 - the other class handled watermarks triggered by elements

This also makes sure that any timestamp assigner / watermark generators 
cannot generate
negative watermarks

commit 6dcdc3e3a82024c066d73b2d25b27a723bbccdd3
Author: Stephan Ewen 
Date:   2016-02-15T17:41:15Z

[FLINK-3401] [streaming] [api breaking] AscendingTimestampExtractor only 
logs violations of ascending timestamp order.

The user can also explicitly set an 'IgnoringHandler' or a 
'FailingHandler', which do nothing on violations,
respectively fail hard.




> AscendingTimestampExtractor should not fail on order violation
> --
>
> Key: FLINK-3401
> URL: https://issues.apache.org/jira/browse/FLINK-3401
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> Currently, the {{AscendingTimestampExtractor}} fails hard when the order of 
> timestamps is violated.
> In the spirit of "streaming jobs never fail", I propose to change the default 
> behavior such that violations are only logged (later collected in metrics) 
> and that hard failures have to be explicitly activated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3401] [api breaking] AscendingTimestamp...

2016-02-16 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1647

[FLINK-3401] [api breaking] AscendingTimestampExtractor only logs violations

This pull requests is based on #1646 

The AscendingTimestampExtractor only logs violations of ascending timestamp 
order. 

Previously it failed hard on a violation. Since that would be in many cases 
irrecoverable as soon as one order violation happens in the stream, it it 
changed to by default only log violations.

The user can still explicitly set an 'IgnoringHandler' or a 
'FailingHandler', which do nothing on violations,
respectively fail hard as before.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
ascending_extractor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1647.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1647


commit bfe22cf7ea72941056fbbb657094ec75e58c74c4
Author: Stephan Ewen 
Date:   2016-02-16T09:37:59Z

[FLINK-3379] [FLINK-3415] [streaming] Refactor TimestampExtractor into two 
separate classes

 - one class handled periodic watermarks
 - the other class handled watermarks triggered by elements

This also makes sure that any timestamp assigner / watermark generators 
cannot generate
negative watermarks

commit 6dcdc3e3a82024c066d73b2d25b27a723bbccdd3
Author: Stephan Ewen 
Date:   2016-02-15T17:41:15Z

[FLINK-3401] [streaming] [api breaking] AscendingTimestampExtractor only 
logs violations of ascending timestamp order.

The user can also explicitly set an 'IgnoringHandler' or a 
'FailingHandler', which do nothing on violations,
respectively fail hard.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3419) Drop partitionByHash from DataStream

2016-02-16 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149257#comment-15149257
 ] 

Stephan Ewen commented on FLINK-3419:
-

+1 from my side

It seems it can do nothing that {{keyBy()}} would not do either, and it adds 
clutter to the API.

> Drop partitionByHash from DataStream
> 
>
> Key: FLINK-3419
> URL: https://issues.apache.org/jira/browse/FLINK-3419
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
>
> The behavior is no different from {{keyBy}}, except that you cannot use keyed 
> state and windows if you use {{partitionByHash}} so I suggest to drop it.
> We might also want to think about dropping {{shuffle}} and {{global}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149198#comment-15149198
 ] 

ASF GitHub Bot commented on FLINK-1745:
---

Github user danielblazevski commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-184853357
  
P.S. about rebasing, need to be careful, something went wrong the first 
time around.  I actually just started working on a new laptop, and started the 
git repo "from scratch" as follows:
```
clone the master and FLINK-1745 branches of my fork of Flink
checkout FLINK-1745, commit and push to origin (origin = my fork)
```

I  set upstream to `origin`, is that a mistake?  Namely, when I push 
locally to GitHub, I set `upstream` to `origin`, namely I ran:
```
git push --set-upstream origin FLINK-1745
```
`origin` is my fork.  Should I re-do this by adding a new `remote` called 
`apache` and run
```
git push --set-upstream apache FLINK-1745
```
and then run the git commands you mentioned to rebase?  Want to be careful, 
making a re-basing mistake can be a nightmare to fix :-)  





> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-02-16 Thread danielblazevski
Github user danielblazevski commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-184853357
  
P.S. about rebasing, need to be careful, something went wrong the first 
time around.  I actually just started working on a new laptop, and started the 
git repo "from scratch" as follows:
```
clone the master and FLINK-1745 branches of my fork of Flink
checkout FLINK-1745, commit and push to origin (origin = my fork)
```

I  set upstream to `origin`, is that a mistake?  Namely, when I push 
locally to GitHub, I set `upstream` to `origin`, namely I ran:
```
git push --set-upstream origin FLINK-1745
```
`origin` is my fork.  Should I re-do this by adding a new `remote` called 
`apache` and run
```
git push --set-upstream apache FLINK-1745
```
and then run the git commands you mentioned to rebase?  Want to be careful, 
making a re-basing mistake can be a nightmare to fix :-)  





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2021) Rework examples to use ParameterTool

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149186#comment-15149186
 ] 

ASF GitHub Bot commented on FLINK-2021:
---

Github user stefanobaghino commented on the pull request:

https://github.com/apache/flink/pull/1581#issuecomment-184850520
  
Thanks for the tip, I'll try to replicate the S3 setup on our fork as well, 
it'll probably come in handy for similar cases.


> Rework examples to use ParameterTool
> 
>
> Key: FLINK-2021
> URL: https://issues.apache.org/jira/browse/FLINK-2021
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Stefano Baghino
>Priority: Minor
>  Labels: starter
>
> In FLINK-1525, we introduced the {{ParameterTool}}.
> We should port the examples to use the tool.
> The examples could look like this (we should maybe discuss it first on the 
> mailing lists):
> {code}
> public static void main(String[] args) throws Exception {
> ParameterTool pt = ParameterTool.fromArgs(args);
> boolean fileOutput = pt.getNumberOfParameters() == 2;
> String textPath = null;
> String outputPath = null;
> if(fileOutput) {
> textPath = pt.getRequired("input");
> outputPath = pt.getRequired("output");
> }
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setUserConfig(pt);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2021] Rework examples to use ParameterT...

2016-02-16 Thread stefanobaghino
Github user stefanobaghino commented on the pull request:

https://github.com/apache/flink/pull/1581#issuecomment-184850520
  
Thanks for the tip, I'll try to replicate the S3 setup on our fork as well, 
it'll probably come in handy for similar cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149173#comment-15149173
 ] 

ASF GitHub Bot commented on FLINK-1745:
---

Github user danielblazevski commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-184847163
  
Hi @chiwanpark sorry for the delay!  I will now have more time to wrap this 
PR up.  I added a check just before calling `knn`:
```scala
  if (useQuadTree) {
if (metric.isInstanceOf[EuclideanDistanceMetric] ||
  metric.isInstanceOf[SquaredEuclideanDistanceMetric]){
  knnQueryWithQuadTree(training.values, testing.values, 
k, metric, queue, out)
} else {
  throw new IllegalArgumentException(s" Error: metric 
must be" +
s" Euclidean or SquaredEuclidean!")
}
  } else {
knnQueryBasic(training.values, testing.values, k, 
metric, queue, out)
  }
}
  }
}
```
Does that work?  The commit includes the hint for the cross operation as 
well. 


> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-02-16 Thread danielblazevski
Github user danielblazevski commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-184847163
  
Hi @chiwanpark sorry for the delay!  I will now have more time to wrap this 
PR up.  I added a check just before calling `knn`:
```scala
  if (useQuadTree) {
if (metric.isInstanceOf[EuclideanDistanceMetric] ||
  metric.isInstanceOf[SquaredEuclideanDistanceMetric]){
  knnQueryWithQuadTree(training.values, testing.values, 
k, metric, queue, out)
} else {
  throw new IllegalArgumentException(s" Error: metric 
must be" +
s" Euclidean or SquaredEuclidean!")
}
  } else {
knnQueryBasic(training.values, testing.values, k, 
metric, queue, out)
  }
}
  }
}
```
Does that work?  The commit includes the hint for the cross operation as 
well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149121#comment-15149121
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1639


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149123#comment-15149123
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1634


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3226] Casting support for arithmetic op...

2016-02-16 Thread twalthr
Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1634


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3226] Translation of explicit casting

2016-02-16 Thread twalthr
Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1639


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-16 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149109#comment-15149109
 ] 

Jamie Grier edited comment on FLINK-1502 at 2/16/16 7:09 PM:
-

Is there no way to refer to a TaskManager by index in order to solve this 
problem?  It would be nice if we didn't have to send all the metrics through 
the JobManager but rather just report them via JMX locally on each host.  I 
think I understand the problem you are describing but wouldn't just having a 
logical index for each TaskManager solve this problem.  I would like to avoid 
having to send the metrics through a central node if possible as I would like 
to see the # of total metrics go up dramatically as we instrument the code more 
and more and give users more insight into how Flink is running.

Maybe we can collaborate on this.  I want a general way to instrument both 
Flink code and user code and make those metrics available easily via JMX at a 
minimum and maybe directly in Graphite and Ganglia.  Once available in JMX 
there are many tools to integrate with other metrics and alerting systems.


was (Author: jgrier):
Is there no way to refer to a TaskManager by index in order to solve this 
problem.  It would be nice if we didn't have to send all the metrics through 
the JobManager but rather just report them via JMX locally on each host.  I 
think I understand the problem you are describing but would just having a 
logical index for each TaskManager solve this problem.  I would like to avoid 
having to send the metrics through a central node if possible as I would like 
to see the # of total metrics go up dramatically as we instrument the code more 
and more.

> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-16 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149109#comment-15149109
 ] 

Jamie Grier commented on FLINK-1502:


Is there no way to refer to a TaskManager by index in order to solve this 
problem.  It would be nice if we didn't have to send all the metrics through 
the JobManager but rather just report them via JMX locally on each host.  I 
think I understand the problem you are describing but would just having a 
logical index for each TaskManager solve this problem.  I would like to avoid 
having to send the metrics through a central node if possible as I would like 
to see the # of total metrics go up dramatically as we instrument the code more 
and more.

> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3379) Refactor TimestampExtractor

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149103#comment-15149103
 ] 

ASF GitHub Bot commented on FLINK-3379:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1646

[FLINK-3379] [FLINK-3415] [streaming] Refactor TimestampExtractor

This pull request divides the TimestampExtractor into two separate classes
 - one class handled periodic watermarks
 - the other class handled watermarks triggered by individual elements

This also makes sure that any timestamp assigner / watermark generators 
cannot generate
negative watermarks.

This also adds tests for the operators that run the timestamp extractors.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
refactor_extractors

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1646.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1646


commit 1e3c4597dd1785430735950eef2b7b8c2154faaf
Author: Stephan Ewen 
Date:   2016-02-16T09:37:59Z

[FLINK-3379] [FLINK-3415] [streaming] Refactor TimestampExtractor into two 
separate classes

 - one class handled periodic watermarks
 - the other class handled watermarks triggered by elements

This also makes sure that any timestamp assigner / watermark generators 
cannot generate
negative watermarks




> Refactor TimestampExtractor
> ---
>
> Key: FLINK-3379
> URL: https://issues.apache.org/jira/browse/FLINK-3379
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.0.0
>
>
> Based on a lot of user feedback, the current {{TimestampExtractor}} seems 
> very confusing. It implements simultaneously two modes of generating 
> watermarks:
>   - Each record that passes through can decide to cause a watermark.
>   - The timestamp extractor can define a certain watermark timestamp which is 
> periodically picked up by the system and triggers a watermark (if larger than 
> the previous watermark).
> Figuring out how these modes interplay, and how to define the methods to only 
> use one mode has been quite an obstacle for several users. We should break 
> this class into two different classes, one per mode of generating watermarks, 
> to make it easier to understand.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3379] [FLINK-3415] [streaming] Refactor...

2016-02-16 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1646

[FLINK-3379] [FLINK-3415] [streaming] Refactor TimestampExtractor

This pull request divides the TimestampExtractor into two separate classes
 - one class handled periodic watermarks
 - the other class handled watermarks triggered by individual elements

This also makes sure that any timestamp assigner / watermark generators 
cannot generate
negative watermarks.

This also adds tests for the operators that run the timestamp extractors.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
refactor_extractors

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1646.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1646


commit 1e3c4597dd1785430735950eef2b7b8c2154faaf
Author: Stephan Ewen 
Date:   2016-02-16T09:37:59Z

[FLINK-3379] [FLINK-3415] [streaming] Refactor TimestampExtractor into two 
separate classes

 - one class handled periodic watermarks
 - the other class handled watermarks triggered by elements

This also makes sure that any timestamp assigner / watermark generators 
cannot generate
negative watermarks




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-02-16 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149099#comment-15149099
 ] 

Jamie Grier commented on FLINK-1502:


[~eastcirclek] You shouldn't need to do this with counters.  Typically you just 
want to report the value of the counter as is to the metrics system.  The 
metrics system (e.g. Graphite or Ganglia) should have built-in tools for 
turning counters into other types of graphs.  For example, what you really want 
here is a "rate", how many GC invocations per second for example (1st 
derivative of counter).  Ganglia and any decent metrics tools should already 
have this function builtin.  I think we should just report the raw counters.

> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Dongwon Kim
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3421) Remove all unused ClassTag context bounds in the Streaming Scala API

2016-02-16 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3421:
---

 Summary: Remove all unused ClassTag context bounds in the 
Streaming Scala API
 Key: FLINK-3421
 URL: https://issues.apache.org/jira/browse/FLINK-3421
 Project: Flink
  Issue Type: Improvement
  Components: Scala API
Affects Versions: 0.10.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.0.0


Many methods have a {{ClassTag}} as a context bound, but they are never used / 
needed.

The same information as can be drawn from a {{ClassTag}} should also be 
available in the {{TypeInformation}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3413) Remove implicit Seq to DataStream conversion

2016-02-16 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-3413.
---

> Remove implicit Seq to DataStream conversion
> 
>
> Key: FLINK-3413
> URL: https://issues.apache.org/jira/browse/FLINK-3413
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The implicit conversion from {{Seq}} to Flink DataStream needs to create 
> internally a new execution environment.
> This method is confusing to use. If one uses the Seq in a program that uses a 
> different execution environment, then different streams run on different 
> execution environments.
> The overhead of manually calling {{env.fromElements(seq)}} is quite low.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3413) Remove implicit Seq to DataStream conversion

2016-02-16 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-3413.
-
Resolution: Fixed

Fixed via 8cce136bf89fa2a399db8b25a2045e57d5cc4f61

> Remove implicit Seq to DataStream conversion
> 
>
> Key: FLINK-3413
> URL: https://issues.apache.org/jira/browse/FLINK-3413
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The implicit conversion from {{Seq}} to Flink DataStream needs to create 
> internally a new execution environment.
> This method is confusing to use. If one uses the Seq in a program that uses a 
> different execution environment, then different streams run on different 
> execution environments.
> The overhead of manually calling {{env.fromElements(seq)}} is quite low.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3420) Remove "ReadTextFileWithValue" from StreamExecutionEnvironment

2016-02-16 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3420:
---

 Summary: Remove "ReadTextFileWithValue" from 
StreamExecutionEnvironment
 Key: FLINK-3420
 URL: https://issues.apache.org/jira/browse/FLINK-3420
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10.2
Reporter: Stephan Ewen
 Fix For: 1.0.0


These methods are too specific of very limited use in streaming (highly special 
case of a bounded data set).

They bloat the API, and lock the development into keeping supporting them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-02-16 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3322:
---
Priority: Critical  (was: Major)

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3418) RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security configuration

2016-02-16 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148906#comment-15148906
 ] 

Stephan Ewen commented on FLINK-3418:
-

Something like this needs to be added:

https://github.com/apache/flink/blob/master/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1370

This runs code as privileged code under the authenticated user .

> RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security 
> configuration
> 
>
> Key: FLINK-3418
> URL: https://issues.apache.org/jira/browse/FLINK-3418
> Project: Flink
>  Issue Type: Bug
>  Components: state backends
>Reporter: Robert Metzger
>Priority: Critical
>
> As you can see for example in the {{YARNTaskManagerRunner}}, our TaskManagers 
> are running in a special UserGroupInformation.doAs(); call. 
> With that call, we are manually changing the user from the user starting the 
> YARN NodeManager (our containers are part of that process tree) to the user 
> who submitted the job.
> For example on my cluster, the NodeManager runs as "yarn", but "robert" 
> submits the job. For regular file access, "robert" is accessing the files in 
> HDFS, even though "yarn" runs the process.
> The {{HDFSCopyFromLocal}} does not properly initialize these settings, hence 
> "yarn" tries to access the files, leading to the following exception:
> {code}
> Caused by: java.lang.RuntimeException: Error while copying to remote 
> FileSystem: SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/yarn/nm/usercache/robert/appcache/application_1455632128025_0010/filecache/17/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/opt/cloudera/parcels/CDH-5.4.5-1.cdh5.4.5.p0.7/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Exception in thread "main" org.apache.hadoop.security.AccessControlException: 
> Permission denied: user=yarn, access=WRITE, 
> inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
>   at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
>   at 

[GitHub] flink pull request: Refactor Common Parts of Stream/Batch Document...

2016-02-16 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/1645

Refactor Common Parts of Stream/Batch Documentation

The main bit is the refactoring, I also added a basic RocksDB documentation 
and created a new "Working with Time" section for the streaming guide.

I put this on top of my TimeCharacteristic pull request because I already 
had some doc changes there with regards to time and time characteristic.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink doc-refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1645.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1645


commit dba30ffec81b42b5fb962ad15f95f13341373c5c
Author: Aljoscha Krettek 
Date:   2016-01-15T16:05:12Z

[FLINK-3243] Fix Interplay of TimeCharacteristic and Time Windows

This adds dedicated WindowAssigners for processing time and event time.
timeWindow() and timeWindowAll() respect the TimeCharacteristic set
on the StreamExecutionEnvironment.

This will make the easy stuff easy, i.e. using time windows and quickly
switching the time characteristic. Users will then have the flexibility
to mix different kinds of window assigners in their job.

This also expands the translation tests to verify that the correct
window operators are instantiated.

commit cdf976c6125b3370386b13aa751eabb80d9c4ff4
Author: Aljoscha Krettek 
Date:   2016-02-15T18:30:55Z

[FLINK-3403] Create Section "Working with Time" in Streaming Guide

commit bf837e815252c49869d0a10c80bbfaf7dc866914
Author: Aljoscha Krettek 
Date:   2016-02-16T13:19:51Z

[FLINK-3402] Refactor Common Parts of Stream/Batch Documentation

commit ba782733869b82a62bdc5b3404ad51878b468048
Author: Aljoscha Krettek 
Date:   2016-02-16T16:39:57Z

Add (basic) RocksDB State Backend Documentation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3418) RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security configuration

2016-02-16 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148896#comment-15148896
 ] 

Aljoscha Krettek commented on FLINK-3418:
-

Where is the logic that does the user change? Could this simply be added to the 
copy utilities?

> RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security 
> configuration
> 
>
> Key: FLINK-3418
> URL: https://issues.apache.org/jira/browse/FLINK-3418
> Project: Flink
>  Issue Type: Bug
>  Components: state backends
>Reporter: Robert Metzger
>Priority: Critical
>
> As you can see for example in the {{YARNTaskManagerRunner}}, our TaskManagers 
> are running in a special UserGroupInformation.doAs(); call. 
> With that call, we are manually changing the user from the user starting the 
> YARN NodeManager (our containers are part of that process tree) to the user 
> who submitted the job.
> For example on my cluster, the NodeManager runs as "yarn", but "robert" 
> submits the job. For regular file access, "robert" is accessing the files in 
> HDFS, even though "yarn" runs the process.
> The {{HDFSCopyFromLocal}} does not properly initialize these settings, hence 
> "yarn" tries to access the files, leading to the following exception:
> {code}
> Caused by: java.lang.RuntimeException: Error while copying to remote 
> FileSystem: SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/yarn/nm/usercache/robert/appcache/application_1455632128025_0010/filecache/17/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/opt/cloudera/parcels/CDH-5.4.5-1.cdh5.4.5.p0.7/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Exception in thread "main" org.apache.hadoop.security.AccessControlException: 
> Permission denied: user=yarn, access=WRITE, 
> inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
>   at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 

[jira] [Commented] (FLINK-3375) Allow Watermark Generation in the Kafka Source

2016-02-16 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148889#comment-15148889
 ] 

Aljoscha Krettek commented on FLINK-3375:
-

Yes, this makes sense.

> Allow Watermark Generation in the Kafka Source
> --
>
> Key: FLINK-3375
> URL: https://issues.apache.org/jira/browse/FLINK-3375
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> It is a common case that event timestamps are ascending inside one Kafka 
> Partition. Ascending timestamps are easy for users, because they are handles 
> by ascending timestamp extraction.
> If the Kafka source has multiple partitions per source task, then the records 
> become out of order before timestamps can be extracted and watermarks can be 
> generated.
> If we make the FlinkKafkaConsumer an event time source function, it can 
> generate watermarks itself. It would internally implement the same logic as 
> the regular operators that merge streams, keeping track of event time 
> progress per partition and generating watermarks based on the current 
> guaranteed event time progress.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3419) Drop partitionByHash from DataStream

2016-02-16 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-3419:
---

 Summary: Drop partitionByHash from DataStream
 Key: FLINK-3419
 URL: https://issues.apache.org/jira/browse/FLINK-3419
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.0.0
Reporter: Aljoscha Krettek
Priority: Blocker


The behavior is no different from {{keyBy}}, except that you cannot use keyed 
state and windows if you use {{partitionByHash}} so I suggest to drop it.

We might also want to think about dropping {{shuffle}} and {{global}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3418) RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security configuration

2016-02-16 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-3418:
--
Component/s: state backends

> RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security 
> configuration
> 
>
> Key: FLINK-3418
> URL: https://issues.apache.org/jira/browse/FLINK-3418
> Project: Flink
>  Issue Type: Bug
>  Components: state backends
>Reporter: Robert Metzger
>Priority: Critical
>
> As you can see for example in the {{YARNTaskManagerRunner}}, our TaskManagers 
> are running in a special UserGroupInformation.doAs(); call. 
> With that call, we are manually changing the user from the user starting the 
> YARN NodeManager (our containers are part of that process tree) to the user 
> who submitted the job.
> For example on my cluster, the NodeManager runs as "yarn", but "robert" 
> submits the job. For regular file access, "robert" is accessing the files in 
> HDFS, even though "yarn" runs the process.
> The {{HDFSCopyFromLocal}} does not properly initialize these settings, hence 
> "yarn" tries to access the files, leading to the following exception:
> {code}
> Caused by: java.lang.RuntimeException: Error while copying to remote 
> FileSystem: SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/yarn/nm/usercache/robert/appcache/application_1455632128025_0010/filecache/17/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/opt/cloudera/parcels/CDH-5.4.5-1.cdh5.4.5.p0.7/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Exception in thread "main" org.apache.hadoop.security.AccessControlException: 
> Permission denied: user=yarn, access=WRITE, 
> inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
>   at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  

[jira] [Commented] (FLINK-3418) RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security configuration

2016-02-16 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148878#comment-15148878
 ] 

Robert Metzger commented on FLINK-3418:
---

Currently, even making the directory accessible for the user running the NM 
("yarn") doesn't solve the problem because {{initializeForJob()}} creates a 
directory with the user who submitted the job "robert".

> RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security 
> configuration
> 
>
> Key: FLINK-3418
> URL: https://issues.apache.org/jira/browse/FLINK-3418
> Project: Flink
>  Issue Type: Bug
>  Components: state backends
>Reporter: Robert Metzger
>Priority: Critical
>
> As you can see for example in the {{YARNTaskManagerRunner}}, our TaskManagers 
> are running in a special UserGroupInformation.doAs(); call. 
> With that call, we are manually changing the user from the user starting the 
> YARN NodeManager (our containers are part of that process tree) to the user 
> who submitted the job.
> For example on my cluster, the NodeManager runs as "yarn", but "robert" 
> submits the job. For regular file access, "robert" is accessing the files in 
> HDFS, even though "yarn" runs the process.
> The {{HDFSCopyFromLocal}} does not properly initialize these settings, hence 
> "yarn" tries to access the files, leading to the following exception:
> {code}
> Caused by: java.lang.RuntimeException: Error while copying to remote 
> FileSystem: SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/yarn/nm/usercache/robert/appcache/application_1455632128025_0010/filecache/17/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/opt/cloudera/parcels/CDH-5.4.5-1.cdh5.4.5.p0.7/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Exception in thread "main" org.apache.hadoop.security.AccessControlException: 
> Permission denied: user=yarn, access=WRITE, 
> inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>   at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
>   at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   

[jira] [Created] (FLINK-3418) RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security configuration

2016-02-16 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-3418:
-

 Summary: RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop 
security configuration
 Key: FLINK-3418
 URL: https://issues.apache.org/jira/browse/FLINK-3418
 Project: Flink
  Issue Type: Bug
Reporter: Robert Metzger
Priority: Critical


As you can see for example in the {{YARNTaskManagerRunner}}, our TaskManagers 
are running in a special UserGroupInformation.doAs(); call. 

With that call, we are manually changing the user from the user starting the 
YARN NodeManager (our containers are part of that process tree) to the user who 
submitted the job.

For example on my cluster, the NodeManager runs as "yarn", but "robert" submits 
the job. For regular file access, "robert" is accessing the files in HDFS, even 
though "yarn" runs the process.

The {{HDFSCopyFromLocal}} does not properly initialize these settings, hence 
"yarn" tries to access the files, leading to the following exception:

{code}
Caused by: java.lang.RuntimeException: Error while copying to remote 
FileSystem: SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/yarn/nm/usercache/robert/appcache/application_1455632128025_0010/filecache/17/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/opt/cloudera/parcels/CDH-5.4.5-1.cdh5.4.5.p0.7/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" org.apache.hadoop.security.AccessControlException: 
Permission denied: user=yarn, access=WRITE, 
inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2755)
  

[jira] [Commented] (FLINK-3299) Remove ApplicationID from Environment

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148868#comment-15148868
 ] 

ASF GitHub Bot commented on FLINK-3299:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1642#issuecomment-184755784
  
The `CompletedCheckpoint` comes from the `CheckpointCoordinator`. 
Savepoints are the same thing. Therefore I would keep it as it is.


> Remove ApplicationID from Environment
> -
>
> Key: FLINK-3299
> URL: https://issues.apache.org/jira/browse/FLINK-3299
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> {{ApplicationID}} is used to identify an application across many job 
> submissions (for example after restoring from a savepoint). This is currently 
> exposed in the {{Environment}}, which might be unnecessary.
> State backends, which need the ID can generate it themselves and store it as 
> part of their state handle.
> This has to be checked with the DB state backend, which currently uses this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3299] Remove ApplicationID from Environ...

2016-02-16 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1642#issuecomment-184755784
  
The `CompletedCheckpoint` comes from the `CheckpointCoordinator`. 
Savepoints are the same thing. Therefore I would keep it as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2021) Rework examples to use ParameterTool

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148863#comment-15148863
 ] 

ASF GitHub Bot commented on FLINK-2021:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1581#issuecomment-184754971
  
To get to the logs, I executed your branch in my travis account. I have a 
S3 bucket configured where we upload the full logs to (travis has a log limit 
of ~5 mb): https://travis-ci.org/rmetzger/flink/builds/109557167

Then, in the log of one build: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/109557168/log.txt
you'll find the link to the s3 bucket: 
https://s3.amazonaws.com/flink-logs-us/travis-artifacts/rmetzger/flink/1419/1419.1.tar.gz

unpacking that archive, you'll find the output in the `1.log` file. The 
YARN test is executing the WordCount example, and I guess the arguments passed 
are wrong.


> Rework examples to use ParameterTool
> 
>
> Key: FLINK-2021
> URL: https://issues.apache.org/jira/browse/FLINK-2021
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Stefano Baghino
>Priority: Minor
>  Labels: starter
>
> In FLINK-1525, we introduced the {{ParameterTool}}.
> We should port the examples to use the tool.
> The examples could look like this (we should maybe discuss it first on the 
> mailing lists):
> {code}
> public static void main(String[] args) throws Exception {
> ParameterTool pt = ParameterTool.fromArgs(args);
> boolean fileOutput = pt.getNumberOfParameters() == 2;
> String textPath = null;
> String outputPath = null;
> if(fileOutput) {
> textPath = pt.getRequired("input");
> outputPath = pt.getRequired("output");
> }
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setUserConfig(pt);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2021] Rework examples to use ParameterT...

2016-02-16 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1581#issuecomment-184754971
  
To get to the logs, I executed your branch in my travis account. I have a 
S3 bucket configured where we upload the full logs to (travis has a log limit 
of ~5 mb): https://travis-ci.org/rmetzger/flink/builds/109557167

Then, in the log of one build: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/109557168/log.txt
you'll find the link to the s3 bucket: 
https://s3.amazonaws.com/flink-logs-us/travis-artifacts/rmetzger/flink/1419/1419.1.tar.gz

unpacking that archive, you'll find the output in the `1.log` file. The 
YARN test is executing the WordCount example, and I guess the arguments passed 
are wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3417) Add RocksDB StateBackendFactory and integrate with Flink Config

2016-02-16 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-3417:
---

 Summary: Add RocksDB StateBackendFactory and integrate with Flink 
Config
 Key: FLINK-3417
 URL: https://issues.apache.org/jira/browse/FLINK-3417
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.0.0
Reporter: Aljoscha Krettek
Priority: Blocker


Currently, only memory and file backends can be configured as default backends 
in {{flink-conf.yaml}}. To change that we need to add a StateBackendFactory.

Also, there cannot be a short name for this such as how you can now specify 
"jobmanager" or "filesystem" in the config because RocksDB is not in the 
standard dist.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2021) Rework examples to use ParameterTool

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148860#comment-15148860
 ] 

ASF GitHub Bot commented on FLINK-2021:
---

Github user stefanobaghino commented on the pull request:

https://github.com/apache/flink/pull/1581#issuecomment-184753539
  
@rmetzger I'm looking at the logs as well but I couldn't find that 
stacktrace in particular. I've looked for `ParameterTool.fromArgs` (that is 
present in the stacktrace you extracted) in the raw logs of the three failing 
builds but found nothing...


> Rework examples to use ParameterTool
> 
>
> Key: FLINK-2021
> URL: https://issues.apache.org/jira/browse/FLINK-2021
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Stefano Baghino
>Priority: Minor
>  Labels: starter
>
> In FLINK-1525, we introduced the {{ParameterTool}}.
> We should port the examples to use the tool.
> The examples could look like this (we should maybe discuss it first on the 
> mailing lists):
> {code}
> public static void main(String[] args) throws Exception {
> ParameterTool pt = ParameterTool.fromArgs(args);
> boolean fileOutput = pt.getNumberOfParameters() == 2;
> String textPath = null;
> String outputPath = null;
> if(fileOutput) {
> textPath = pt.getRequired("input");
> outputPath = pt.getRequired("output");
> }
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setUserConfig(pt);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2021] Rework examples to use ParameterT...

2016-02-16 Thread stefanobaghino
Github user stefanobaghino commented on the pull request:

https://github.com/apache/flink/pull/1581#issuecomment-184753539
  
@rmetzger I'm looking at the logs as well but I couldn't find that 
stacktrace in particular. I've looked for `ParameterTool.fromArgs` (that is 
present in the stacktrace you extracted) in the raw logs of the three failing 
builds but found nothing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2021] Rework examples to use ParameterT...

2016-02-16 Thread stefanobaghino
Github user stefanobaghino commented on the pull request:

https://github.com/apache/flink/pull/1581#issuecomment-184747590
  
Thanks for digging up the tests, I missed that exception in particular, 
I'll fix it right away.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2021) Rework examples to use ParameterTool

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148843#comment-15148843
 ] 

ASF GitHub Bot commented on FLINK-2021:
---

Github user stefanobaghino commented on the pull request:

https://github.com/apache/flink/pull/1581#issuecomment-184747590
  
Thanks for digging up the tests, I missed that exception in particular, 
I'll fix it right away.


> Rework examples to use ParameterTool
> 
>
> Key: FLINK-2021
> URL: https://issues.apache.org/jira/browse/FLINK-2021
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Stefano Baghino
>Priority: Minor
>  Labels: starter
>
> In FLINK-1525, we introduced the {{ParameterTool}}.
> We should port the examples to use the tool.
> The examples could look like this (we should maybe discuss it first on the 
> mailing lists):
> {code}
> public static void main(String[] args) throws Exception {
> ParameterTool pt = ParameterTool.fromArgs(args);
> boolean fileOutput = pt.getNumberOfParameters() == 2;
> String textPath = null;
> String outputPath = null;
> if(fileOutput) {
> textPath = pt.getRequired("input");
> outputPath = pt.getRequired("output");
> }
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setUserConfig(pt);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3400) RocksDB Backend does not work when not in Flink lib folder

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148827#comment-15148827
 ] 

ASF GitHub Bot commented on FLINK-3400:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/1644

[FLINK-3400] Move RocksDB Copy Utils to flink-streaming-java

They are not specific to RocksDB, just utilities for copying local
folders to/from HDFS. Moving them to flink-streaming-java means that
they are always in the classpath of the TaskManager, not only in the
user-code jar when using RocksDB. If they are only in the user-code jar
the external process runner cannot find the class files, leading to
ClassNotFoundExceptions.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink rocksdb-fix-copy-utils

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1644.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1644


commit 6dfdf5d643d9df2598b25f61134e24ab37ecb7df
Author: Aljoscha Krettek 
Date:   2016-02-15T12:57:15Z

[FLINK-3400] Move RocksDB Copy Utils to flink-streaming-java

They are not specific to RocksDB, just utilities for copying local
folders to/from HDFS. Moving them to flink-streaming-java means that
they are always in the classpath of the TaskManager, not only in the
user-code jar when using RocksDB. If they are only in the user-code jar
the external process runner cannot find the class files, leading to
ClassNotFoundExceptions.




> RocksDB Backend does not work when not in Flink lib folder
> --
>
> Key: FLINK-3400
> URL: https://issues.apache.org/jira/browse/FLINK-3400
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The problem is that the external process runner cannot load the 
> {{HDFSCopyFromLocal}} and {{HDFSCopyToLocal}} classes when the rocksdb 
> backend jar is not in the Flink lib folder. If rocksdb is only in the 
> user-code jar the code cannot be loaded when trying to start the external 
> copy runners.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3400] Move RocksDB Copy Utils to flink-...

2016-02-16 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/1644

[FLINK-3400] Move RocksDB Copy Utils to flink-streaming-java

They are not specific to RocksDB, just utilities for copying local
folders to/from HDFS. Moving them to flink-streaming-java means that
they are always in the classpath of the TaskManager, not only in the
user-code jar when using RocksDB. If they are only in the user-code jar
the external process runner cannot find the class files, leading to
ClassNotFoundExceptions.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink rocksdb-fix-copy-utils

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1644.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1644


commit 6dfdf5d643d9df2598b25f61134e24ab37ecb7df
Author: Aljoscha Krettek 
Date:   2016-02-15T12:57:15Z

[FLINK-3400] Move RocksDB Copy Utils to flink-streaming-java

They are not specific to RocksDB, just utilities for copying local
folders to/from HDFS. Moving them to flink-streaming-java means that
they are always in the classpath of the TaskManager, not only in the
user-code jar when using RocksDB. If they are only in the user-code jar
the external process runner cannot find the class files, leading to
ClassNotFoundExceptions.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3375) Allow Watermark Generation in the Kafka Source

2016-02-16 Thread Zach Cox (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148821#comment-15148821
 ] 

Zach Cox commented on FLINK-3375:
-

Sure [~StephanEwen] - although after thinking it through, it's probably 
unnecessary. If the FlinkKafkaConsumer tracks a watermark per topic partition, 
as described in this jira ticket, that should be good enough.

My thinking went something like this: imagine n web servers sending page view 
events to a Kafka topic with m partitions. Each event has a UUID field which is 
used as the message key. So each web server is spreading its events across all 
m partitions approximately uniformly, and each partition contains events from 
all n web servers.

Each event also has a timestamp field generated by the web server using its 
system clock. Each web server will (likely) be generating events with ascending 
timestamps, but the system clocks across web servers are independent, so each 
topic partition will contain events that are not strictly ascending. In general 
the events in a partition will be slightly out-of-order.

An alternative to inferring watermarks from the events in the Kafka topic 
partitions (i.e. the current `TimestampExtractor`) could be to have the event 
generators (e.g. web servers) also emit watermarks directly into the topic 
partitions. So the partitions would contain two types of messages: either an 
event or a watermark, just like how the internal Flink streams contain 
`StreamElement` which is either a `StreamRecord` or `Watermark`. Each Kafka 
topic partition then contains both events and watermarks from multiple sources. 
The watermark for each partition would then be the min of watermarks from all 
sources - so I guess the watermarks would need to contain some kind of sourceId 
and then you either need to know all sourceIds, or use heuristics to infer all 
sourceIds to track the min. At that point you could just include a sourceId in 
the event itself and just treat each event as a watermark, and skip the 
explicit watermarks altogether. 

So instead of building anything new into Flink to support this, I think I would 
just include a sourceId (e.g. host+port of web server) along with the timestamp 
in the events, and then use that sourceId in my `TimestampExtractor` 
implementation to determine the watermark of the Kafka topic partition. Perhaps 
a generic `TimestampExtractor` implementation that tracks watermarks per event 
source could be created, but including both events and watermarks in the input 
Kafka topic partitions is probably not a good idea.

> Allow Watermark Generation in the Kafka Source
> --
>
> Key: FLINK-3375
> URL: https://issues.apache.org/jira/browse/FLINK-3375
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> It is a common case that event timestamps are ascending inside one Kafka 
> Partition. Ascending timestamps are easy for users, because they are handles 
> by ascending timestamp extraction.
> If the Kafka source has multiple partitions per source task, then the records 
> become out of order before timestamps can be extracted and watermarks can be 
> generated.
> If we make the FlinkKafkaConsumer an event time source function, it can 
> generate watermarks itself. It would internally implement the same logic as 
> the regular operators that merge streams, keeping track of event time 
> progress per partition and generating watermarks based on the current 
> guaranteed event time progress.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3299] Remove ApplicationID from Environ...

2016-02-16 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1642#issuecomment-184728030
  
should we maybe rename the CompletedCheckpoint class to Savepoint? the 
naming appears a bit incoonsistent now that the Savepoint class is removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3299) Remove ApplicationID from Environment

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148804#comment-15148804
 ] 

ASF GitHub Bot commented on FLINK-3299:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1642#issuecomment-184728030
  
should we maybe rename the CompletedCheckpoint class to Savepoint? the 
naming appears a bit incoonsistent now that the Savepoint class is removed.


> Remove ApplicationID from Environment
> -
>
> Key: FLINK-3299
> URL: https://issues.apache.org/jira/browse/FLINK-3299
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> {{ApplicationID}} is used to identify an application across many job 
> submissions (for example after restoring from a savepoint). This is currently 
> exposed in the {{Environment}}, which might be unnecessary.
> State backends, which need the ID can generate it themselves and store it as 
> part of their state handle.
> This has to be checked with the DB state backend, which currently uses this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3299] Remove ApplicationID from Environ...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1642#discussion_r53023536
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
 ---
@@ -337,19 +326,14 @@ public void testRollbackStateStoreFailure() throws 
Exception {
 
@Test
public void testRollbackUpdatesApplicationID() throws Exception {
--- End diff --

test method should renamed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3299) Remove ApplicationID from Environment

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148781#comment-15148781
 ] 

ASF GitHub Bot commented on FLINK-3299:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1642#discussion_r53023536
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
 ---
@@ -337,19 +326,14 @@ public void testRollbackStateStoreFailure() throws 
Exception {
 
@Test
public void testRollbackUpdatesApplicationID() throws Exception {
--- End diff --

test method should renamed


> Remove ApplicationID from Environment
> -
>
> Key: FLINK-3299
> URL: https://issues.apache.org/jira/browse/FLINK-3299
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> {{ApplicationID}} is used to identify an application across many job 
> submissions (for example after restoring from a savepoint). This is currently 
> exposed in the {{Environment}}, which might be unnecessary.
> State backends, which need the ID can generate it themselves and store it as 
> part of their state handle.
> This has to be checked with the DB state backend, which currently uses this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1633#issuecomment-184719993
  
Closing this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148774#comment-15148774
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1633#issuecomment-184719993
  
Closing this...


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148775#comment-15148775
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user uce closed the pull request at:

https://github.com/apache/flink/pull/1633


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce closed the pull request at:

https://github.com/apache/flink/pull/1633


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3410) setting setNumberOfExecutionRetries to 0 still leads to RESTARTs.

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148768#comment-15148768
 ] 

ASF GitHub Bot commented on FLINK-3410:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1643#discussion_r53022123
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -354,9 +358,9 @@ public ExecutionConfig setNumberOfExecutionRetries(int 
numberOfExecutionRetries)
 */
@Deprecated
public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) 
{
-   if (executionRetryDelay < -1 ) {
+   if (executionRetryDelay < 0 ) {
throw new IllegalArgumentException(
-   "The delay between reties must be non-negative, 
or -1 (use system default)");
+   "The delay between reties must be 
non-negative.");
--- End diff --

Good catch. Overlooked the typo. Will fix it.


> setting setNumberOfExecutionRetries to 0 still leads to RESTARTs.
> -
>
> Key: FLINK-3410
> URL: https://issues.apache.org/jira/browse/FLINK-3410
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
>
> While testing the RC0 for 1.0.0 I found the following issue:
> Setting the number of retries to 0 still leads to the job being restarted:
> {code}
> final StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> see.setNumberOfExecutionRetries(0);
> {code}
> {code}
> 21:19:50,677 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 0e78d0825da485167aabee7e63c8e913 (Data Generator) changed 
> to RESTARTING.
> 21:19:50,678 INFO  
> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy  - 
> Delaying retry of job execution for 1 ms ...
> {code}
> While looking through the code, it seems that the execution config is 
> returning {{null}} when the number of retries is set to 0. With {{null}} the 
> jobManager picks the default restart strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3410] [restart] Choose NoRestart strate...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1643#discussion_r53022123
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -354,9 +358,9 @@ public ExecutionConfig setNumberOfExecutionRetries(int 
numberOfExecutionRetries)
 */
@Deprecated
public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) 
{
-   if (executionRetryDelay < -1 ) {
+   if (executionRetryDelay < 0 ) {
throw new IllegalArgumentException(
-   "The delay between reties must be non-negative, 
or -1 (use system default)");
+   "The delay between reties must be 
non-negative.");
--- End diff --

Good catch. Overlooked the typo. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3243] Fix Interplay of TimeCharacterist...

2016-02-16 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1513#issuecomment-184715239
  
@rmetzger @StephanEwen Do you still have objections? I think this should go 
in ASAP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3243) Fix Interplay of TimeCharacteristic and Time Windows

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148710#comment-15148710
 ] 

ASF GitHub Bot commented on FLINK-3243:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1513#issuecomment-184715239
  
@rmetzger @StephanEwen Do you still have objections? I think this should go 
in ASAP.


> Fix Interplay of TimeCharacteristic and Time Windows
> 
>
> Key: FLINK-3243
> URL: https://issues.apache.org/jira/browse/FLINK-3243
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> As per the discussion on the Dev ML: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Time-Behavior-in-Streaming-Jobs-Event-time-processing-time-td9616.html.
> The discussion seems to have converged on option 2):
> - Add dedicated WindowAssigners for processing time and event time
> - {{timeWindow()}} and {{timeWindowAll()}} respect the set 
> {{TimeCharacteristic}}. 
> This will make the easy stuff easy, i.e. using time windows and quickly 
> switching the time characteristic. Users will then have the flexibility to 
> mix different kinds of window assigners in their job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3296) DataStream.write*() methods are not flushing properly

2016-02-16 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-3296.
---
   Resolution: Fixed
Fix Version/s: 1.0.0

Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/2714aaf3

> DataStream.write*() methods are not flushing properly
> -
>
> Key: FLINK-3296
> URL: https://issues.apache.org/jira/browse/FLINK-3296
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> The DataStream.write() methods rely on the {{FileSinkFunctionByMillis}} 
> class, which has a logic for flushing records, even though the underlying 
> stream is never flushed. This is misleading for users as files are not 
> written as they would expect it.
> The code was initial written with FileOutputFormats in mind, but the types 
> were not set correctly. This PR opened the write() method to any output 
> format: https://github.com/apache/flink/pull/706/files



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r53019468
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1079,6 +1079,9 @@ class JobManager(
   executionGraph.registerExecutionListener(gateway)
   executionGraph.registerJobStatusListener(gateway)
 }
+
+// All good. Submission succeeded!
+jobInfo.client ! 
decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
--- End diff --

Hmm but now we have the problem that the user might see a 
`JobSubmitSuccess` without the job being stored in the 
`SubmittedJobGraphStore`, right? This means that if the `JobManager` dies 
before the job is persisted, it will never be recovered. I think this violates 
the `JobSubmitSuccess` contract.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3296) DataStream.write*() methods are not flushing properly

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148707#comment-15148707
 ] 

ASF GitHub Bot commented on FLINK-3296:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1563


> DataStream.write*() methods are not flushing properly
> -
>
> Key: FLINK-3296
> URL: https://issues.apache.org/jira/browse/FLINK-3296
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> The DataStream.write() methods rely on the {{FileSinkFunctionByMillis}} 
> class, which has a logic for flushing records, even though the underlying 
> stream is never flushed. This is misleading for users as files are not 
> written as they would expect it.
> The code was initial written with FileOutputFormats in mind, but the types 
> were not set correctly. This PR opened the write() method to any output 
> format: https://github.com/apache/flink/pull/706/files



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3296] Remove 'flushing' behavior of the...

2016-02-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1563


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3410] [restart] Choose NoRestart strate...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1643#discussion_r53020089
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -354,9 +358,9 @@ public ExecutionConfig setNumberOfExecutionRetries(int 
numberOfExecutionRetries)
 */
@Deprecated
public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) 
{
-   if (executionRetryDelay < -1 ) {
+   if (executionRetryDelay < 0 ) {
throw new IllegalArgumentException(
-   "The delay between reties must be non-negative, 
or -1 (use system default)");
+   "The delay between reties must be 
non-negative.");
--- End diff --

we can fix this type while we're at it: reties:retries


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3410) setting setNumberOfExecutionRetries to 0 still leads to RESTARTs.

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148703#comment-15148703
 ] 

ASF GitHub Bot commented on FLINK-3410:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1643#discussion_r53020089
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -354,9 +358,9 @@ public ExecutionConfig setNumberOfExecutionRetries(int 
numberOfExecutionRetries)
 */
@Deprecated
public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) 
{
-   if (executionRetryDelay < -1 ) {
+   if (executionRetryDelay < 0 ) {
throw new IllegalArgumentException(
-   "The delay between reties must be non-negative, 
or -1 (use system default)");
+   "The delay between reties must be 
non-negative.");
--- End diff --

we can fix this type while we're at it: reties:retries


> setting setNumberOfExecutionRetries to 0 still leads to RESTARTs.
> -
>
> Key: FLINK-3410
> URL: https://issues.apache.org/jira/browse/FLINK-3410
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
>
> While testing the RC0 for 1.0.0 I found the following issue:
> Setting the number of retries to 0 still leads to the job being restarted:
> {code}
> final StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> see.setNumberOfExecutionRetries(0);
> {code}
> {code}
> 21:19:50,677 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 0e78d0825da485167aabee7e63c8e913 (Data Generator) changed 
> to RESTARTING.
> 21:19:50,678 INFO  
> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy  - 
> Delaying retry of job execution for 1 ms ...
> {code}
> While looking through the code, it seems that the execution config is 
> returning {{null}} when the number of retries is set to 0. With {{null}} the 
> jobManager picks the default restart strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148700#comment-15148700
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r53019879
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1079,6 +1079,9 @@ class JobManager(
   executionGraph.registerExecutionListener(gateway)
   executionGraph.registerJobStatusListener(gateway)
 }
+
+// All good. Submission succeeded!
+jobInfo.client ! 
decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
--- End diff --

Oh boy... not my day today. Thanks for catching that. This was not expected.


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r53019879
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1079,6 +1079,9 @@ class JobManager(
   executionGraph.registerExecutionListener(gateway)
   executionGraph.registerJobStatusListener(gateway)
 }
+
+// All good. Submission succeeded!
+jobInfo.client ! 
decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
--- End diff --

Oh boy... not my day today. Thanks for catching that. This was not expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148695#comment-15148695
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r53019468
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1079,6 +1079,9 @@ class JobManager(
   executionGraph.registerExecutionListener(gateway)
   executionGraph.registerJobStatusListener(gateway)
 }
+
+// All good. Submission succeeded!
+jobInfo.client ! 
decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
--- End diff --

Hmm but now we have the problem that the user might see a 
`JobSubmitSuccess` without the job being stored in the 
`SubmittedJobGraphStore`, right? This means that if the `JobManager` dies 
before the job is persisted, it will never be recovered. I think this violates 
the `JobSubmitSuccess` contract.


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2380) Allow to configure default FS for file inputs

2016-02-16 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-2380.
---
Resolution: Fixed

Merged in http://git-wip-us.apache.org/repos/asf/flink/commit/76d3a635

> Allow to configure default FS for file inputs
> -
>
> Key: FLINK-2380
> URL: https://issues.apache.org/jira/browse/FLINK-2380
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 0.9, 0.10.0
>Reporter: Ufuk Celebi
>Assignee: Klou
>Priority: Minor
>  Labels: starter
> Fix For: 1.0.0
>
>
> File inputs use "file://" as default prefix. A user asked to make this 
> configurable, e.g. "hdfs://" as default.
> (I'm not sure whether this is already possible or not. I will check and 
> either close or implement this for the user.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

2016-02-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1524


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148665#comment-15148665
 ] 

ASF GitHub Bot commented on FLINK-2380:
---

Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/1524#issuecomment-184703065
  
Perfect! Thanks a lot @rmetzger 


> Allow to configure default FS for file inputs
> -
>
> Key: FLINK-2380
> URL: https://issues.apache.org/jira/browse/FLINK-2380
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 0.9, 0.10.0
>Reporter: Ufuk Celebi
>Assignee: Klou
>Priority: Minor
>  Labels: starter
> Fix For: 1.0.0
>
>
> File inputs use "file://" as default prefix. A user asked to make this 
> configurable, e.g. "hdfs://" as default.
> (I'm not sure whether this is already possible or not. I will check and 
> either close or implement this for the user.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

2016-02-16 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/1524#issuecomment-184703065
  
Perfect! Thanks a lot @rmetzger 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148667#comment-15148667
 ] 

ASF GitHub Bot commented on FLINK-2380:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1524


> Allow to configure default FS for file inputs
> -
>
> Key: FLINK-2380
> URL: https://issues.apache.org/jira/browse/FLINK-2380
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 0.9, 0.10.0
>Reporter: Ufuk Celebi
>Assignee: Klou
>Priority: Minor
>  Labels: starter
> Fix For: 1.0.0
>
>
> File inputs use "file://" as default prefix. A user asked to make this 
> configurable, e.g. "hdfs://" as default.
> (I'm not sure whether this is already possible or not. I will check and 
> either close or implement this for the user.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3410) setting setNumberOfExecutionRetries to 0 still leads to RESTARTs.

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148664#comment-15148664
 ] 

ASF GitHub Bot commented on FLINK-3410:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1643

[FLINK-3410] [restart] Choose NoRestart strategy if the number of retries 
is set to 0

This fixes the problem that when checkpointing is enabled and the number of 
execution retries is set to `0` that the automatic restarting should be 
deactivated. This is consistent with the semantics before the restart 
strategies where introduced.

Be aware, though, that whenever you enable checkpointing for streaming 
jobs, the cluster wide default restart strategy which is set in the 
configuration will always be overwritten. Either by manually setting a restart 
strategy or by automatically setting a 
`FixedDelayRestartStrategy(Integer.MAX_VALUE, 1)` strategy in the 
`StreamingJobGraphGenerator` if nothing was specified. That is consistent with 
the previous behaviour where all default execution retry attempts set in the 
configuration where overwritten in case of a checkpointed streaming job.

Additionally, this PR sets the default retry delay to 1 ms and 
disallows to set it to negative values. Before, the default execution retry 
delay would have been used if the delay in the `ExecutionConfig` was set to 
`-1`. However, with the new `RestartStrategies` there is no longer the 
possibility to set an explicit execution retry delay independent of the number 
of retries in the configuration file. Therefore it is no longer possible to set 
the number of execution retries in the configuration file and to specify the 
execution retry delay in the `ExecutionConfig` or vice versa. Both have to be 
defined either in the `ExecutionConfig` or the configuration file.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixRestartStrategy

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1643.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1643


commit 36ef22bc3d5491c67165a3ee1d4e08663723260b
Author: Till Rohrmann 
Date:   2016-02-16T00:15:39Z

[FLINK-3410] [restart] Choose NoRestart strategy if the number of retries 
is set to 0

Add test case




> setting setNumberOfExecutionRetries to 0 still leads to RESTARTs.
> -
>
> Key: FLINK-3410
> URL: https://issues.apache.org/jira/browse/FLINK-3410
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
>
> While testing the RC0 for 1.0.0 I found the following issue:
> Setting the number of retries to 0 still leads to the job being restarted:
> {code}
> final StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> see.setNumberOfExecutionRetries(0);
> {code}
> {code}
> 21:19:50,677 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 0e78d0825da485167aabee7e63c8e913 (Data Generator) changed 
> to RESTARTING.
> 21:19:50,678 INFO  
> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy  - 
> Delaying retry of job execution for 1 ms ...
> {code}
> While looking through the code, it seems that the execution config is 
> returning {{null}} when the number of retries is set to 0. With {{null}} the 
> jobManager picks the default restart strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3410] [restart] Choose NoRestart strate...

2016-02-16 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1643

[FLINK-3410] [restart] Choose NoRestart strategy if the number of retries 
is set to 0

This fixes the problem that when checkpointing is enabled and the number of 
execution retries is set to `0` that the automatic restarting should be 
deactivated. This is consistent with the semantics before the restart 
strategies where introduced.

Be aware, though, that whenever you enable checkpointing for streaming 
jobs, the cluster wide default restart strategy which is set in the 
configuration will always be overwritten. Either by manually setting a restart 
strategy or by automatically setting a 
`FixedDelayRestartStrategy(Integer.MAX_VALUE, 1)` strategy in the 
`StreamingJobGraphGenerator` if nothing was specified. That is consistent with 
the previous behaviour where all default execution retry attempts set in the 
configuration where overwritten in case of a checkpointed streaming job.

Additionally, this PR sets the default retry delay to 1 ms and 
disallows to set it to negative values. Before, the default execution retry 
delay would have been used if the delay in the `ExecutionConfig` was set to 
`-1`. However, with the new `RestartStrategies` there is no longer the 
possibility to set an explicit execution retry delay independent of the number 
of retries in the configuration file. Therefore it is no longer possible to set 
the number of execution retries in the configuration file and to specify the 
execution retry delay in the `ExecutionConfig` or vice versa. Both have to be 
defined either in the `ExecutionConfig` or the configuration file.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixRestartStrategy

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1643.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1643


commit 36ef22bc3d5491c67165a3ee1d4e08663723260b
Author: Till Rohrmann 
Date:   2016-02-16T00:15:39Z

[FLINK-3410] [restart] Choose NoRestart strategy if the number of retries 
is set to 0

Add test case




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

2016-02-16 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1524#issuecomment-184702790
  
I tested the change again on a cluster. Everything is working nicely with 
YARN.

I'll merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148663#comment-15148663
 ] 

ASF GitHub Bot commented on FLINK-2380:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1524#issuecomment-184702790
  
I tested the change again on a cluster. Everything is working nicely with 
YARN.

I'll merge the PR.


> Allow to configure default FS for file inputs
> -
>
> Key: FLINK-2380
> URL: https://issues.apache.org/jira/browse/FLINK-2380
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 0.9, 0.10.0
>Reporter: Ufuk Celebi
>Assignee: Klou
>Priority: Minor
>  Labels: starter
> Fix For: 1.0.0
>
>
> File inputs use "file://" as default prefix. A user asked to make this 
> configurable, e.g. "hdfs://" as default.
> (I'm not sure whether this is already possible or not. I will check and 
> either close or implement this for the user.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3315] Fix Slot Sharing in Streaming API

2016-02-16 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53017220
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -202,6 +203,29 @@ public String getUid() {
}
 
/**
+* Returns the slot sharing group of this transformation.
+*
+* @see #setSlotSharingGroup(String)
+*/
+   public String getSlotSharingGroup() {
+   return slotSharingGroup;
+   }
+
+   /**
+* Sets the slot sharing group of this transformation. Parallels 
instances of operations that
--- End diff --

Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148658#comment-15148658
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53017220
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -202,6 +203,29 @@ public String getUid() {
}
 
/**
+* Returns the slot sharing group of this transformation.
+*
+* @see #setSlotSharingGroup(String)
+*/
+   public String getSlotSharingGroup() {
+   return slotSharingGroup;
+   }
+
+   /**
+* Sets the slot sharing group of this transformation. Parallels 
instances of operations that
--- End diff --

Fixed


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3296) DataStream.write*() methods are not flushing properly

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148652#comment-15148652
 ] 

ASF GitHub Bot commented on FLINK-3296:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1563#issuecomment-184700077
  
I'll merge the PR.


> DataStream.write*() methods are not flushing properly
> -
>
> Key: FLINK-3296
> URL: https://issues.apache.org/jira/browse/FLINK-3296
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
>
> The DataStream.write() methods rely on the {{FileSinkFunctionByMillis}} 
> class, which has a logic for flushing records, even though the underlying 
> stream is never flushed. This is misleading for users as files are not 
> written as they would expect it.
> The code was initial written with FileOutputFormats in mind, but the types 
> were not set correctly. This PR opened the write() method to any output 
> format: https://github.com/apache/flink/pull/706/files



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3296] Remove 'flushing' behavior of the...

2016-02-16 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1563#issuecomment-184700077
  
I'll merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2237) Add hash-based Aggregation

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148641#comment-15148641
 ] 

ASF GitHub Bot commented on FLINK-2237:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-184696741
  
Hello Fabian,

I did 3. 
(https://github.com/apache/flink/commit/ef644821d5417b992cfda366225bd5622faa9b2b),
 because the machinery for that was already in place (see the condition in 
`compactOrThrow`). I chose the threshold to be 5%. (This can probably be the 
same with the solution set case, because if lengths change a lot then we get 
very slow as memory load gets near the total memory, so it is probably better 
to indicate the memory problem to the user with an exception than to silently 
be very slow.)

I also did some changes to the tests.

For 2., the situation doesn't seem straightforward to me. For example, if 
there are not many length changes, then exactly the opposite should be done: we 
should emit from the end of the record area (rather than the beginning), 
because if there is skew in the data, then the more common keys will appear 
sooner, so they tend to appear near the beginning of the record area.

The other ideas are also interesting, and I would love to experiment with 
them, but unfortunately I don't really have that much time for this at the 
moment. So I would suggest to merge the non-partitioned version, and then the 
partitioned version can be implemented later when I or someone else has a lot 
of free time on their hands.

(Btw., it would be very interesting to try machine learning techniques for 
dynamically making these decisions that involve complicated trade-offs, based 
on the actual data:
- Have some switches which control these things like
  - what part of the record area to emit (begin or end; how much)
  - at how much fragmentation should we do compacting instead of emitting
  - what load factor should trigger a resize
  - size of bucket area
  - how to choose which partition to emit
  - maybe even do spilling also in the combiner
  - whether to insert prefetch instructions for the random memory accesses 
that will probably involve a CPU cache miss (the trade-off here is that then 
you have to work with multiple consecutive input records at the same time, so 
you have to do extra copies if object reuse is enabled, which might cost a lot) 
(I have actually experimented with this a little, and there were 20-35% 
speedups, if copies are cheap)
  - ... (it's easy to come up with many more)
- Gather some statistics about what is happening, and turn them into 
features
  - avg. record size
  - #keys / #elements ratio
  - skew
  - time it takes to serialize a record
  - time it takes to run the ReduceFunction
  - ratio of updates that involve size changes
  - size is changing up or down on average
  - backpressure
- that we are generating
- that we get from our outputs (if this is large (eg. because of a 
saturated network), then we should set the switches to do more aggressive 
combining)
  - how many CPU cache misses occur while looking up keys (eg. for 
recognizing the situation where records with matching keys are often close to 
each other for some reason)
  - hash collisions (so that we can start with a more simple hash function 
(few percent speedup), and change it, if it is bad)
  - ... (it's easy to come up with many more)
- Train some machine learning model which will figure out how to set the 
switches based on the features

I think a pretty good speedup could result from tuning all these things to 
the actual data at hand.
Maybe in a few years, when data flow systems get more mature, then this can 
become a reality.)


> Add hash-based Aggregation
> --
>
> Key: FLINK-2237
> URL: https://issues.apache.org/jira/browse/FLINK-2237
> Project: Flink
>  Issue Type: New Feature
>Reporter: Rafiullah Momand
>Assignee: Gabor Gevay
>Priority: Minor
>
> Aggregation functions at the moment are implemented in a sort-based way.
> How can we implement hash based Aggregation for Flink?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2237] [runtime] Add hash-based combiner...

2016-02-16 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-184696741
  
Hello Fabian,

I did 3. 
(https://github.com/apache/flink/commit/ef644821d5417b992cfda366225bd5622faa9b2b),
 because the machinery for that was already in place (see the condition in 
`compactOrThrow`). I chose the threshold to be 5%. (This can probably be the 
same with the solution set case, because if lengths change a lot then we get 
very slow as memory load gets near the total memory, so it is probably better 
to indicate the memory problem to the user with an exception than to silently 
be very slow.)

I also did some changes to the tests.

For 2., the situation doesn't seem straightforward to me. For example, if 
there are not many length changes, then exactly the opposite should be done: we 
should emit from the end of the record area (rather than the beginning), 
because if there is skew in the data, then the more common keys will appear 
sooner, so they tend to appear near the beginning of the record area.

The other ideas are also interesting, and I would love to experiment with 
them, but unfortunately I don't really have that much time for this at the 
moment. So I would suggest to merge the non-partitioned version, and then the 
partitioned version can be implemented later when I or someone else has a lot 
of free time on their hands.

(Btw., it would be very interesting to try machine learning techniques for 
dynamically making these decisions that involve complicated trade-offs, based 
on the actual data:
- Have some switches which control these things like
  - what part of the record area to emit (begin or end; how much)
  - at how much fragmentation should we do compacting instead of emitting
  - what load factor should trigger a resize
  - size of bucket area
  - how to choose which partition to emit
  - maybe even do spilling also in the combiner
  - whether to insert prefetch instructions for the random memory accesses 
that will probably involve a CPU cache miss (the trade-off here is that then 
you have to work with multiple consecutive input records at the same time, so 
you have to do extra copies if object reuse is enabled, which might cost a lot) 
(I have actually experimented with this a little, and there were 20-35% 
speedups, if copies are cheap)
  - ... (it's easy to come up with many more)
- Gather some statistics about what is happening, and turn them into 
features
  - avg. record size
  - #keys / #elements ratio
  - skew
  - time it takes to serialize a record
  - time it takes to run the ReduceFunction
  - ratio of updates that involve size changes
  - size is changing up or down on average
  - backpressure
- that we are generating
- that we get from our outputs (if this is large (eg. because of a 
saturated network), then we should set the switches to do more aggressive 
combining)
  - how many CPU cache misses occur while looking up keys (eg. for 
recognizing the situation where records with matching keys are often close to 
each other for some reason)
  - hash collisions (so that we can start with a more simple hash function 
(few percent speedup), and change it, if it is bad)
  - ... (it's easy to come up with many more)
- Train some machine learning model which will figure out how to set the 
switches based on the features

I think a pretty good speedup could result from tuning all these things to 
the actual data at hand.
Maybe in a few years, when data flow systems get more mature, then this can 
become a reality.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148640#comment-15148640
 ] 

ASF GitHub Bot commented on FLINK-2380:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1524#discussion_r53014739
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---
@@ -176,6 +184,8 @@ abstract class ApplicationMasterBase {
 jobManagerPort, webServerPort, slots, taskManagerCount,
 dynamicPropertiesEncodedString)
 
+  //todo should I also set the FS default here
--- End diff --

@rmetzger Yes I know. That comment was forgotten since earlier.


> Allow to configure default FS for file inputs
> -
>
> Key: FLINK-2380
> URL: https://issues.apache.org/jira/browse/FLINK-2380
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 0.9, 0.10.0
>Reporter: Ufuk Celebi
>Assignee: Klou
>Priority: Minor
>  Labels: starter
> Fix For: 1.0.0
>
>
> File inputs use "file://" as default prefix. A user asked to make this 
> configurable, e.g. "hdfs://" as default.
> (I'm not sure whether this is already possible or not. I will check and 
> either close or implement this for the user.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

2016-02-16 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1524#discussion_r53014739
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---
@@ -176,6 +184,8 @@ abstract class ApplicationMasterBase {
 jobManagerPort, webServerPort, slots, taskManagerCount,
 dynamicPropertiesEncodedString)
 
+  //todo should I also set the FS default here
--- End diff --

@rmetzger Yes I know. That comment was forgotten since earlier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148635#comment-15148635
 ] 

ASF GitHub Bot commented on FLINK-2380:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1524#discussion_r53014234
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---
@@ -176,6 +184,8 @@ abstract class ApplicationMasterBase {
 jobManagerPort, webServerPort, slots, taskManagerCount,
 dynamicPropertiesEncodedString)
 
+  //todo should I also set the FS default here
--- End diff --

No. I'll remove this TODO when merging


> Allow to configure default FS for file inputs
> -
>
> Key: FLINK-2380
> URL: https://issues.apache.org/jira/browse/FLINK-2380
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 0.9, 0.10.0
>Reporter: Ufuk Celebi
>Assignee: Klou
>Priority: Minor
>  Labels: starter
> Fix For: 1.0.0
>
>
> File inputs use "file://" as default prefix. A user asked to make this 
> configurable, e.g. "hdfs://" as default.
> (I'm not sure whether this is already possible or not. I will check and 
> either close or implement this for the user.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >