[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906128#comment-15906128
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3484
  
@kl0u @chenqin I cleaned up the commits, distributed the fixes from the 
comments to the right commits. I also added more tests/ITCases for: detecting 
name clashes in side output IDs, side outputs with multiple consumers.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3484
  
@kl0u @chenqin I cleaned up the commits, distributed the fixes from the 
comments to the right commits. I also added more tests/ITCases for: detecting 
name clashes in side output IDs, side outputs with multiple consumers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6024) Need more fine-grained info for "InvalidProgramException: This type (...) cannot be used as key"

2017-03-10 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6024:
-

 Summary: Need more fine-grained info for "InvalidProgramException: 
This type (...) cannot be used as key"
 Key: FLINK-6024
 URL: https://issues.apache.org/jira/browse/FLINK-6024
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison


I got this very confusing exception:

InvalidProgramException: This type (MyType) cannot be used as key

I dug through the code, and could not find what was causing this. The help text 
for type.isKeyType(), in Keys.java:329, right before the exception is thrown, 
says: "Checks whether this type can be used as a key. As a bare minimum, types 
have to be hashable and comparable to be keys." However, this didn't solve the 
problem.

I discovered that in my case, the error was occurring because I added a new 
constructor to the type, and I didn't have a default constructor. This is 
probably quite a common thing to happen for POJOs, so the error message should 
give some detail saying that this is the problem.

Other things that can cause this to fail, including that the class is not 
public, or the constructor is not public, or the key field is not public, or 
that the key field is not a serializable type, or the key is not Comparable, or 
the key is not hashable, should be given in the error message instead, 
depending on the actual cause of the problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()

2017-03-10 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-5541:
--
Description: 
{code}
  if (localJar == null) {
try {
  for (final URL url : ((ContextEnvironment) 
ExecutionEnvironment.getExecutionEnvironment())
  .getJars()) {
// TODO verify that there is only one jar
localJar = new File(url.toURI()).getAbsolutePath();
  }
} catch (final URISyntaxException e) {
  // ignore
} catch (final ClassCastException e) {
  // ignore
}
  }

  logger.info("Submitting topology " + name + " in distributed mode with 
conf " + serConf);
  client.submitTopologyWithOpts(name, localJar, topology);
{code}

Since the try block may encounter URISyntaxException / ClassCastException, we 
should check that localJar is not null before calling submitTopologyWithOpts().

  was:
{code}
  if (localJar == null) {
try {
  for (final URL url : ((ContextEnvironment) 
ExecutionEnvironment.getExecutionEnvironment())
  .getJars()) {
// TODO verify that there is only one jar
localJar = new File(url.toURI()).getAbsolutePath();
  }
} catch (final URISyntaxException e) {
  // ignore
} catch (final ClassCastException e) {
  // ignore
}
  }

  logger.info("Submitting topology " + name + " in distributed mode with 
conf " + serConf);
  client.submitTopologyWithOpts(name, localJar, topology);
{code}
Since the try block may encounter URISyntaxException / ClassCastException, we 
should check that localJar is not null before calling submitTopologyWithOpts().


> Missing null check for localJar in FlinkSubmitter#submitTopology()
> --
>
> Key: FLINK-5541
> URL: https://issues.apache.org/jira/browse/FLINK-5541
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   if (localJar == null) {
> try {
>   for (final URL url : ((ContextEnvironment) 
> ExecutionEnvironment.getExecutionEnvironment())
>   .getJars()) {
> // TODO verify that there is only one jar
> localJar = new File(url.toURI()).getAbsolutePath();
>   }
> } catch (final URISyntaxException e) {
>   // ignore
> } catch (final ClassCastException e) {
>   // ignore
> }
>   }
>   logger.info("Submitting topology " + name + " in distributed mode with 
> conf " + serConf);
>   client.submitTopologyWithOpts(name, localJar, topology);
> {code}
> Since the try block may encounter URISyntaxException / ClassCastException, we 
> should check that localJar is not null before calling 
> submitTopologyWithOpts().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-6023.
-
   Resolution: Fixed
Fix Version/s: (was: 1.2.1)

> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906019#comment-15906019
 ] 

ASF GitHub Bot commented on FLINK-6023:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3510


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3510


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4785) Flink string parser doesn't handle string fields containing two consecutive double quotes

2017-03-10 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905779#comment-15905779
 ] 

Luke Hutchison edited comment on FLINK-4785 at 3/10/17 11:48 PM:
-

I'm pretty sure I have seen backslash escaping in CSV before, but the 
old-school way of quoting quote characters (double double quotes) is the one 
that made it into the RFC, presumably for backwards compatibility with 
spreadsheets.

>From my dup bug report, https://issues.apache.org/jira/browse/FLINK-6107 :

--

The RFC for the CSV format specifies that double quotes are valid in quoted 
strings in CSV, by doubling the quote character:

https://tools.ietf.org/html/rfc4180

However, when parsing a CSV file with Flink containing quoted quotes, such as:

bob,"The name is ""Bob"""

you get this exception:

org.apache.flink.api.common.io.ParseException: Line could not be parsed: 
'bob,"The name is ""Bob"""'
ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING 
Expect field types: class java.lang.String, class java.lang.String

--

See also https://issues.apache.org/jira/browse/FLINK-6016 (quoted strings in 
CSV should be able to contain newlines).


was (Author: lukehutch):
I'm pretty sure I have seen backslash escaping in CSV before, but the 
old-school way of quoting quote characters (double double quotes) is the one 
that made it into the RFC, presumably for backwards compatibility with 
spreadsheets.

Fabian -- you copied the text from the wrong bug report, 
https://issues.apache.org/jira/browse/FLINK-6016 , rather than 
https://issues.apache.org/jira/browse/FLINK-6107 , which is:

--

The RFC for the CSV format specifies that double quotes are valid in quoted 
strings in CSV, by doubling the quote character:

https://tools.ietf.org/html/rfc4180

However, when parsing a CSV file with Flink containing quoted quotes, such as:

bob,"The name is ""Bob"""

you get this exception:

org.apache.flink.api.common.io.ParseException: Line could not be parsed: 
'bob,"The name is ""Bob"""'
ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING 
Expect field types: class java.lang.String, class java.lang.String

> Flink string parser doesn't handle string fields containing two consecutive 
> double quotes
> -
>
> Key: FLINK-4785
> URL: https://issues.apache.org/jira/browse/FLINK-4785
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.2
>Reporter: Flavio Pompermaier
>  Labels: csv
>
> To reproduce the error run 
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5954) Always assign names to the window in the Stream SQL API

2017-03-10 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-5954.

   Resolution: Implemented
Fix Version/s: 1.3.0

Implemented with 7ef068ccc9552799cb9f2bd648782c636d2df2db

> Always assign names to the window in the Stream SQL API
> ---
>
> Key: FLINK-5954
> URL: https://issues.apache.org/jira/browse/FLINK-5954
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, 
> {{SESSION}} grouped windows, as well as the corresponding auxiliary functions 
> that allow uses to query the start and the end of the windows (e.g., 
> {{TUMBLE_START()}} and {{TUMBLE_END()}} see 
> http://calcite.apache.org/docs/stream.html for more details).
> The goal of this jira is to add support for these auxiliary functions in 
> Flink. Flink already has runtime supports for them, as these functions are 
> essential mapped to the {{WindowStart}} and {{WindowEnd}} classes.
> To implement this feature in transformation, the transformation needs to 
> recognize these functions and map them to the {{WindowStart}} and 
> {{WindowEnd}} classes.
> The problem is that both classes can only refer to the windows using alias. 
> Therefore this jira proposes to assign a unique name for each window to 
> enable the transformation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3461: [FLINK-5954] Always assign names to the window in ...

2017-03-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3461


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5954) Always assign names to the window in the Stream SQL API

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905871#comment-15905871
 ] 

ASF GitHub Bot commented on FLINK-5954:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3461


> Always assign names to the window in the Stream SQL API
> ---
>
> Key: FLINK-5954
> URL: https://issues.apache.org/jira/browse/FLINK-5954
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> CALCITE-1603 and CALCITE-1615 brings in supports for {{TUMBLE}}, {{HOP}}, 
> {{SESSION}} grouped windows, as well as the corresponding auxiliary functions 
> that allow uses to query the start and the end of the windows (e.g., 
> {{TUMBLE_START()}} and {{TUMBLE_END()}} see 
> http://calcite.apache.org/docs/stream.html for more details).
> The goal of this jira is to add support for these auxiliary functions in 
> Flink. Flink already has runtime supports for them, as these functions are 
> essential mapped to the {{WindowStart}} and {{WindowEnd}} classes.
> To implement this feature in transformation, the transformation needs to 
> recognize these functions and map them to the {{WindowStart}} and 
> {{WindowEnd}} classes.
> The problem is that both classes can only refer to the windows using alias. 
> Therefore this jira proposes to assign a unique name for each window to 
> enable the transformation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6007) ConcurrentModificationException in WatermarkCallbackService

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905848#comment-15905848
 ] 

ASF GitHub Bot commented on FLINK-6007:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/3514

[FLINK-6007] Allow key removal from within the watermark callback.

When deleting a key from the InternalWatermarkCallbackService, the
deleted key is put into a separate set, and the actual deletion
happens after the iteration over all keys has finished. To avoid
checkpointing the deletion set, the actual cleanup also happens
upon checkpointing.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink watermark-callback-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3514.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3514


commit d3a1b6e72eb56db40638c0b0889f5277c4671b61
Author: kl0u 
Date:   2017-03-08T19:18:18Z

[FLINK-6007] Allow key removal from within the watermark callback.

When deleting a key from the InternalWatermarkCallbackService, the
deleted key is put into a separate set, and the actual deletion
happens after the iteration over all keys has finished. To avoid
checkpointing the deletion set, the actual cleanup also happens
upon checkpointing.




> ConcurrentModificationException in WatermarkCallbackService
> ---
>
> Key: FLINK-6007
> URL: https://issues.apache.org/jira/browse/FLINK-6007
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, if an attempt is made to call 
> {{InternalWatermarkCallbackService.unregisterKeyFromWatermarkCallback()}} 
> from within the {{OnWatermarkCallback}}, a 
> {{ConcurrentModificationException}} is thrown. The reason is that the 
> {{invokeOnWatermarkCallback}} iterates over the list of keys and calls the 
> callback for each one of them.
> To fix this, the deleted keys are put into a separate list, and the deletion 
> happens after the iteration over all keys has finished.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3514: [FLINK-6007] Allow key removal from within the wat...

2017-03-10 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/3514

[FLINK-6007] Allow key removal from within the watermark callback.

When deleting a key from the InternalWatermarkCallbackService, the
deleted key is put into a separate set, and the actual deletion
happens after the iteration over all keys has finished. To avoid
checkpointing the deletion set, the actual cleanup also happens
upon checkpointing.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink watermark-callback-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3514.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3514


commit d3a1b6e72eb56db40638c0b0889f5277c4671b61
Author: kl0u 
Date:   2017-03-08T19:18:18Z

[FLINK-6007] Allow key removal from within the watermark callback.

When deleting a key from the InternalWatermarkCallbackService, the
deleted key is put into a separate set, and the actual deletion
happens after the iteration over all keys has finished. To avoid
checkpointing the deletion set, the actual cleanup also happens
upon checkpointing.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3513: [hotfix] [doc] Fix error in ProcessFunction exampl...

2017-03-10 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/3513

[hotfix] [doc] Fix error in ProcessFunction example.

Fixes an error in the `ProcessFunction` documentation that was reported in 
the mailing list.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink fix-process-func-doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3513.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3513


commit 4eba9d74e1b197acc42579b545e62f2c0ddee35c
Author: kl0u 
Date:   2017-03-10T22:34:56Z

[hotfix] [doc] Fix error in ProcessFunction example.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3433: [FLINK-5911] [gelly] Command-line parameters

2017-03-10 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3433
  
@vasia reusable parameters will make it much easier to add drivers and 
inputs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3501


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5911) Command-line parameters

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905792#comment-15905792
 ] 

ASF GitHub Bot commented on FLINK-5911:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3433
  
@vasia reusable parameters will make it much easier to add drivers and 
inputs.


> Command-line parameters
> ---
>
> Key: FLINK-5911
> URL: https://issues.apache.org/jira/browse/FLINK-5911
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> Create interface for parsing command-line parameters using {{ParameterTool}} 
> and generic implementations for boolean, long, double, string, choice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-5874.
-
Resolution: Fixed

> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905789#comment-15905789
 ] 

Kostas Kloudas commented on FLINK-5874:
---

Merged at f15a7d2d9c9aae72bb3ac3eb2478b3ec4759401b

> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905788#comment-15905788
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3501


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (FLINK-4785) Flink string parser doesn't handle string fields containing two consecutive double quotes

2017-03-10 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-4785:
-
Comment: was deleted

(was: Copying the issue description of FLINK-6017

{quote}
The RFC for the CSV format specifies that newlines are valid in quoted strings 
in CSV:

https://tools.ietf.org/html/rfc4180

However, when parsing a CSV file with Flink containing a newline, such as:

"3
4",5

you get this exception:

Line could not be parsed: '"3'
ParserError UNTERMINATED_QUOTED_STRING 
Expect field types: class java.lang.String, class java.lang.String 
{quote})

> Flink string parser doesn't handle string fields containing two consecutive 
> double quotes
> -
>
> Key: FLINK-4785
> URL: https://issues.apache.org/jira/browse/FLINK-4785
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.2
>Reporter: Flavio Pompermaier
>  Labels: csv
>
> To reproduce the error run 
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4785) Flink string parser doesn't handle string fields containing two consecutive double quotes

2017-03-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905787#comment-15905787
 ] 

Fabian Hueske commented on FLINK-4785:
--

Oh, yes. Sorry for the confusion. I'll delete my comment.

> Flink string parser doesn't handle string fields containing two consecutive 
> double quotes
> -
>
> Key: FLINK-4785
> URL: https://issues.apache.org/jira/browse/FLINK-4785
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.2
>Reporter: Flavio Pompermaier
>  Labels: csv
>
> To reproduce the error run 
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4785) Flink string parser doesn't handle string fields containing two consecutive double quotes

2017-03-10 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905779#comment-15905779
 ] 

Luke Hutchison commented on FLINK-4785:
---

I'm pretty sure I have seen backslash escaping in CSV before, but the 
old-school way of quoting quote characters (double double quotes) is the one 
that made it into the RFC, presumably for backwards compatibility with 
spreadsheets.

Fabian -- you copied the text from the wrong bug report, 
https://issues.apache.org/jira/browse/FLINK-6016 , rather than 
https://issues.apache.org/jira/browse/FLINK-6107 , which is:

--

The RFC for the CSV format specifies that double quotes are valid in quoted 
strings in CSV, by doubling the quote character:

https://tools.ietf.org/html/rfc4180

However, when parsing a CSV file with Flink containing quoted quotes, such as:

bob,"The name is ""Bob"""

you get this exception:

org.apache.flink.api.common.io.ParseException: Line could not be parsed: 
'bob,"The name is ""Bob"""'
ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING 
Expect field types: class java.lang.String, class java.lang.String

> Flink string parser doesn't handle string fields containing two consecutive 
> double quotes
> -
>
> Key: FLINK-4785
> URL: https://issues.apache.org/jira/browse/FLINK-4785
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.2
>Reporter: Flavio Pompermaier
>  Labels: csv
>
> To reproduce the error run 
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6017) CSV reader does not support quoted double quotes

2017-03-10 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905775#comment-15905775
 ] 

Luke Hutchison commented on FLINK-6017:
---

Thanks, sorry for the dup!

> CSV reader does not support quoted double quotes
> 
>
> Key: FLINK-6017
> URL: https://issues.apache.org/jira/browse/FLINK-6017
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
> Environment: Linux
>Reporter: Luke Hutchison
>
> The RFC for the CSV format specifies that double quotes are valid in quoted 
> strings in CSV, by doubling the quote character:
> https://tools.ietf.org/html/rfc4180
> However, when parsing a CSV file with Flink containing quoted quotes, such as:
> bob,"The name is ""Bob"""
> you get this exception:
> org.apache.flink.api.common.io.ParseException: Line could not be parsed: 
> 'bob,"The name is ""Bob"""'
> ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING 
> Expect field types: class java.lang.String, class java.lang.String 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2814) DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905762#comment-15905762
 ] 

ASF GitHub Bot commented on FLINK-2814:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2029
  
@StephanEwen should I create an alternate PR?


> DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode
> ---
>
> Key: FLINK-2814
> URL: https://issues.apache.org/jira/browse/FLINK-2814
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.10.0
>Reporter: Greg Hogan
>Assignee: Rekha Joshi
>
> A delta iteration that closes with a solution set which is a {{JoinOperator}} 
> throws the following exception:
> {noformat}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:444)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:345)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:289)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:324)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:969)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1019)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.optimizer.plan.DualInputPlanNode cannot be cast to 
> org.apache.flink.optimizer.plan.SingleInputPlanNode
>   at 
> org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:432)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
>   at 
> org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:271)
>   at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:543)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:350)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:64)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:796)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:424)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1365)
>   at Driver.main(Driver.java:366)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:429)
>   ... 6 more
> {noformat}
> Temporary fix is to attach an identity mapper.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2029: [FLINK-2814] Fix for DualInputPlanNode cannot be cast to ...

2017-03-10 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2029
  
@StephanEwen should I create an alternate PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6017) CSV reader does not support quoted double quotes

2017-03-10 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6017.

Resolution: Duplicate

Thanks for reporting this issue [~lukehutch].
This is a duplicate of FLINK-4785.
I copied your issue description to FLINK-4785.

Thanks, Fabian

> CSV reader does not support quoted double quotes
> 
>
> Key: FLINK-6017
> URL: https://issues.apache.org/jira/browse/FLINK-6017
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
> Environment: Linux
>Reporter: Luke Hutchison
>
> The RFC for the CSV format specifies that double quotes are valid in quoted 
> strings in CSV, by doubling the quote character:
> https://tools.ietf.org/html/rfc4180
> However, when parsing a CSV file with Flink containing quoted quotes, such as:
> bob,"The name is ""Bob"""
> you get this exception:
> org.apache.flink.api.common.io.ParseException: Line could not be parsed: 
> 'bob,"The name is ""Bob"""'
> ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING 
> Expect field types: class java.lang.String, class java.lang.String 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4785) Flink string parser doesn't handle string fields containing two consecutive double quotes

2017-03-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905696#comment-15905696
 ] 

Fabian Hueske commented on FLINK-4785:
--

Copying the issue description of FLINK-6017

{quote}
The RFC for the CSV format specifies that newlines are valid in quoted strings 
in CSV:

https://tools.ietf.org/html/rfc4180

However, when parsing a CSV file with Flink containing a newline, such as:

"3
4",5

you get this exception:

Line could not be parsed: '"3'
ParserError UNTERMINATED_QUOTED_STRING 
Expect field types: class java.lang.String, class java.lang.String 
{quote}

> Flink string parser doesn't handle string fields containing two consecutive 
> double quotes
> -
>
> Key: FLINK-4785
> URL: https://issues.apache.org/jira/browse/FLINK-4785
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.2
>Reporter: Flavio Pompermaier
>  Labels: csv
>
> To reproduce the error run 
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905681#comment-15905681
 ] 

ASF GitHub Bot commented on FLINK-5995:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3503
  
That sounds reasonable!

On Mar 10, 2017 4:07 PM, "Jincheng Sun"  wrote:

> Hi,@StephanEwen , Thanks a lot for your
> comment. I seriously think about where to add the test more is reasonable,
> because the change of this PR is OperatorStateBackend, so I added the
> test case injava/ src/test/java/org/apache/flink/runtime/state/
> operatorStateBackendTest.java Is this make sense for you? feel free to
> tell me if it is incorrect.
> Best,
> SunJincheng
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



> Get a Exception when creating the ListStateDescriptor with a TypeInformation 
> -
>
> Key: FLINK-5995
> URL: https://issues.apache.org/jira/browse/FLINK-5995
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When use OperatorState and creating the ListStateDescriptor with a 
> TypeInformation,I got a exception. The Exception info is:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Serializer not yet initialized.
>   at 
> org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169)
>   at 
> org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91)
>   at 
> org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:242)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:681)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:669)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> So, I'll add `stateDescriptor.initializeSerializerUnlessSet()` call in the 
> `getOperatorState` method. I appreciate If anyone can give me some advice?



--
This message was sent by Atlassian JIRA

[GitHub] flink issue #3503: [FLINK-5995][checkpoints] fix Get a Exception when creati...

2017-03-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3503
  
That sounds reasonable!

On Mar 10, 2017 4:07 PM, "Jincheng Sun"  wrote:

> Hi,@StephanEwen , Thanks a lot for your
> comment. I seriously think about where to add the test more is reasonable,
> because the change of this PR is OperatorStateBackend, so I added the
> test case injava/ src/test/java/org/apache/flink/runtime/state/
> operatorStateBackendTest.java Is this make sense for you? feel free to
> tell me if it is incorrect.
> Best,
> SunJincheng
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3484
  
Thanks @kl0u for the (already) quite thorough review! I'll push a commit 
with fixes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905414#comment-15905414
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105441679
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

I see. The problem is that if this does not work, then we can have 
important side effects.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>

[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105441679
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

I see. The problem is that if this does not work, then we can have 
important side effects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905412#comment-15905412
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105441456
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

I would have liked to include the `TypeInformation` into the check but we 
can't do that because it's transient. I'll try and figure something out for 
checking that side outputs are unique, not as easy as it seems.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New 

[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105441456
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

I would have liked to include the `TypeInformation` into the check but we 
can't do that because it's transient. I'll try and figure something out for 
checking that side outputs are unique, not as easy as it seems.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905390#comment-15905390
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3484
  
Thanks @kl0u for the (already) quite thorough review! I'll push a commit 
with fixes.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905381#comment-15905381
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105436408
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -539,5 +625,26 @@ public void collect(StreamRecord record) {
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
+
--- End diff --

fixed.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105436408
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -539,5 +625,26 @@ public void collect(StreamRecord record) {
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
+
--- End diff --

fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105436193
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -387,14 +403,25 @@ public int getChainLength() {
 
protected final StreamStatusProvider streamStatusProvider;
 
-   public ChainingOutput(OneInputStreamOperator operator, 
StreamStatusProvider streamStatusProvider) {
+   protected final OutputTag outputTag;
+
+   public ChainingOutput(
+   OneInputStreamOperator operator,
+   StreamStatusProvider streamStatusProvider,
+   OutputTag outputTag) {
this.operator = operator;
this.numRecordsIn = ((OperatorMetricGroup) 
operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
}
 
@Override
--- End diff --

This I'm fixing, as I mentioned above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905380#comment-15905380
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105436193
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -387,14 +403,25 @@ public int getChainLength() {
 
protected final StreamStatusProvider streamStatusProvider;
 
-   public ChainingOutput(OneInputStreamOperator operator, 
StreamStatusProvider streamStatusProvider) {
+   protected final OutputTag outputTag;
+
+   public ChainingOutput(
+   OneInputStreamOperator operator,
+   StreamStatusProvider streamStatusProvider,
+   OutputTag outputTag) {
this.operator = operator;
this.numRecordsIn = ((OperatorMetricGroup) 
operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
}
 
@Override
--- End diff --

This I'm fixing, as I mentioned above.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905378#comment-15905378
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105435806
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ---
@@ -72,6 +76,11 @@ public RecordWriterOutput(
 
--- End diff --

I can remove the reduplication on `RecordWriterOutput`, `ChainingOutput` 
and `CopyingChainingOutput`. Basically those `Outputs` that don't forward to 
other outputs but instead push into the operator or into the network. For the 
other `Outputs` removing the duplication is not possible because inside the 
respective `output()` method they call `output()` of another `Output`. They 
call either with an `OutputTag` or without, so the method body is not actually 
a duplicate.

I did find another bug, though, where `CopyingBroadcastingOutputCollector` 
in `OperatorChain` was not calling the correct `collect()` method on the 
downstream `Outputs`.  


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105435806
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ---
@@ -72,6 +76,11 @@ public RecordWriterOutput(
 
--- End diff --

I can remove the reduplication on `RecordWriterOutput`, `ChainingOutput` 
and `CopyingChainingOutput`. Basically those `Outputs` that don't forward to 
other outputs but instead push into the operator or into the network. For the 
other `Outputs` removing the duplication is not possible because inside the 
respective `output()` method they call `output()` of another `Output`. They 
call either with an `OutputTag` or without, so the method body is not actually 
a duplicate.

I did find another bug, though, where `CopyingBroadcastingOutputCollector` 
in `OperatorChain` was not calling the correct `collect()` method on the 
downstream `Outputs`. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905376#comment-15905376
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3512

[FLINK-6008] collection of BlobServer improvements

This PR improves the following things around the `BlobServer`/`BlobCache`:

* replaces config uptions in `config.md` with non-deprecated ones, e.g. 
`high-availability.cluster-id` and `high-availability.storageDir`
* promote `BlobStore#deleteAll(JobID)` to the `BlobService`
* extend the `BlobService` to work with `NAME_ADDRESSABLE` blobs (prepares 
for FLINK-4399]
* remove `NAME_ADDRESSABLE` blobs after job/task termination
* add more unit tests for `NAME_ADDRESSABLE` blobs
* do not fail the `BlobServer` when a delete operation fails
* general code style and docs improvements, like using 
`Preconditions.checkArgument`



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6008

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3512.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3512


commit 8cfbe97df3f7c8fa268f5c19291174a99e3cf943
Author: Nico Kruber 
Date:   2016-12-20T15:49:57Z

[FLINK-6008][docs] minor improvements in the BlobService docs

commit a72b31474fd38f27e5cc582b3c2797fa51695e38
Author: Nico Kruber 
Date:   2017-01-06T17:42:58Z

[FLINK-6008][docs] update some config options to the new, non-deprecated 
ones

commit a6af4e0b393a8684984a6adada7e6eff4f99ac18
Author: Nico Kruber 
Date:   2016-12-20T17:27:13Z

[FLINK-6008] refactor BlobCache#getURL() for cleaner code

commit 69247739e127f8c941e352c07a0be6e03ecea1d1
Author: Nico Kruber 
Date:   2016-12-20T17:52:19Z

[FLINK-6008] extend the BlobService to the NAME_ADDRESSABLE blobs

These blobs are referenced by the job ID and a selected name instead of the
hash sum of the blob's contents. Some code was already prepared but lacked
the proper additions in further APIs. This commit adds some.

commit 9913ae86b854e1c5b3dca404824ab9a70cc32db6
Author: Nico Kruber 
Date:   2016-12-21T15:23:29Z

[FLINK-6008] promote BlobStore#deleteAll(JobID) to the BlobService

commit d96e6d43ac637149e9d1077c6dee3801d30f679a
Author: Nico Kruber 
Date:   2017-03-09T17:14:02Z

[FLINK-6008] use Preconditions.checkArgument in BlobClient

commit 6d53e3ff87110601eb1a71d60f850e6089930141
Author: Nico Kruber 
Date:   2016-12-21T16:59:27Z

[FLINK-6008] properly remove NAME_ADDRESSABLE blobs after job/task 
termination

commit 5ef5a74db3f6753437b585823b037e25e23a61ba
Author: Nico Kruber 
Date:   2017-03-09T18:14:52Z

[FLINK-6008] more unit tests for NAME_ADDRESSABLE and BlobService access

NAME_ADDRESSABLE blobs were not that thouroughly tested before and also the
access methods that the BlobService implementations provide. This adds tests
covering both.

commit 34857456a43ec5a2ccb5166bd379f263cd54697d
Author: Nico Kruber 
Date:   2017-03-09T17:15:08Z

[FLINK-6008] do not fail the BlobServer when delete fails

This also enables us to reuse some more code between BlobServerConnection 
and
BlobServer.

commit e55ab0f37005ef37065b8156f59e4b8db1a7b95f
Author: Nico Kruber 
Date:   2017-03-09T17:32:14Z

[FLINK-6008] refactor BlobCache#deleteGlobal() for cleaner code




> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-10 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3512

[FLINK-6008] collection of BlobServer improvements

This PR improves the following things around the `BlobServer`/`BlobCache`:

* replaces config uptions in `config.md` with non-deprecated ones, e.g. 
`high-availability.cluster-id` and `high-availability.storageDir`
* promote `BlobStore#deleteAll(JobID)` to the `BlobService`
* extend the `BlobService` to work with `NAME_ADDRESSABLE` blobs (prepares 
for FLINK-4399]
* remove `NAME_ADDRESSABLE` blobs after job/task termination
* add more unit tests for `NAME_ADDRESSABLE` blobs
* do not fail the `BlobServer` when a delete operation fails
* general code style and docs improvements, like using 
`Preconditions.checkArgument`



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6008

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3512.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3512


commit 8cfbe97df3f7c8fa268f5c19291174a99e3cf943
Author: Nico Kruber 
Date:   2016-12-20T15:49:57Z

[FLINK-6008][docs] minor improvements in the BlobService docs

commit a72b31474fd38f27e5cc582b3c2797fa51695e38
Author: Nico Kruber 
Date:   2017-01-06T17:42:58Z

[FLINK-6008][docs] update some config options to the new, non-deprecated 
ones

commit a6af4e0b393a8684984a6adada7e6eff4f99ac18
Author: Nico Kruber 
Date:   2016-12-20T17:27:13Z

[FLINK-6008] refactor BlobCache#getURL() for cleaner code

commit 69247739e127f8c941e352c07a0be6e03ecea1d1
Author: Nico Kruber 
Date:   2016-12-20T17:52:19Z

[FLINK-6008] extend the BlobService to the NAME_ADDRESSABLE blobs

These blobs are referenced by the job ID and a selected name instead of the
hash sum of the blob's contents. Some code was already prepared but lacked
the proper additions in further APIs. This commit adds some.

commit 9913ae86b854e1c5b3dca404824ab9a70cc32db6
Author: Nico Kruber 
Date:   2016-12-21T15:23:29Z

[FLINK-6008] promote BlobStore#deleteAll(JobID) to the BlobService

commit d96e6d43ac637149e9d1077c6dee3801d30f679a
Author: Nico Kruber 
Date:   2017-03-09T17:14:02Z

[FLINK-6008] use Preconditions.checkArgument in BlobClient

commit 6d53e3ff87110601eb1a71d60f850e6089930141
Author: Nico Kruber 
Date:   2016-12-21T16:59:27Z

[FLINK-6008] properly remove NAME_ADDRESSABLE blobs after job/task 
termination

commit 5ef5a74db3f6753437b585823b037e25e23a61ba
Author: Nico Kruber 
Date:   2017-03-09T18:14:52Z

[FLINK-6008] more unit tests for NAME_ADDRESSABLE and BlobService access

NAME_ADDRESSABLE blobs were not that thouroughly tested before and also the
access methods that the BlobService implementations provide. This adds tests
covering both.

commit 34857456a43ec5a2ccb5166bd379f263cd54697d
Author: Nico Kruber 
Date:   2017-03-09T17:15:08Z

[FLINK-6008] do not fail the BlobServer when delete fails

This also enables us to reuse some more code between BlobServerConnection 
and
BlobServer.

commit e55ab0f37005ef37065b8156f59e4b8db1a7b95f
Author: Nico Kruber 
Date:   2017-03-09T17:32:14Z

[FLINK-6008] refactor BlobCache#deleteGlobal() for cleaner code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6008) collection of BlobServer improvements

2017-03-10 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-6008:
---
Description: 
The following things should be improved around the BlobServer/BlobCache:
* update config uptions with non-deprecated ones, e.g. 
{{high-availability.cluster-id}} and {{high-availability.storageDir}}
* promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
* extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs (prepares 
FLINK-4399]
* remove {{NAME_ADDRESSABLE}} blobs after job/task termination
* do not fail the {{BlobServer}} when a delete operation fails
* code style, like using {{Preconditions.checkArgument}}

  was:
The following things should be removed around the BlobServer/BlobCache:
* update config uptions with non-deprecated ones, e.g. 
{{high-availability.cluster-id}} and {{high-availability.storageDir}}
* promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
* extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs (prepares 
FLINK-4399]
* remove {{NAME_ADDRESSABLE}} blobs after job/task termination
* do not fail the {{BlobServer}} when a delete operation fails
* code style, like using {{Preconditions.checkArgument}}


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905315#comment-15905315
 ] 

ASF GitHub Bot commented on FLINK-6023:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3510
  
Thanks @maocorte for the quick fix, looks good to merge now.


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3510: [FLINK-6023] Fix Scala snippet into Process Function Doc

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3510
  
Thanks @maocorte for the quick fix, looks good to merge now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5910) Framework for Gelly examples

2017-03-10 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-5910.
-
Resolution: Implemented

Implemented in 70e78a620df503f06e298dd5537f24a56a8cc866

> Framework for Gelly examples
> 
>
> Key: FLINK-5910
> URL: https://issues.apache.org/jira/browse/FLINK-5910
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> Driver jobs are composed of an input, an algorithm, and an output. Create the 
> interfaces for inputs, algorithms, and outputs.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-10 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-5890.
-
Resolution: Fixed

Fixed in 694794eb6cbb63dace5a3389a99878f952f0faa5

> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5910) Framework for Gelly examples

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905299#comment-15905299
 ] 

ASF GitHub Bot commented on FLINK-5910:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3431


> Framework for Gelly examples
> 
>
> Key: FLINK-5910
> URL: https://issues.apache.org/jira/browse/FLINK-5910
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> Driver jobs are composed of an input, an algorithm, and an output. Create the 
> interfaces for inputs, algorithms, and outputs.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5890) GatherSumApply broken when object reuse enabled

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905298#comment-15905298
 ] 

ASF GitHub Bot commented on FLINK-5890:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3402


> GatherSumApply broken when object reuse enabled
> ---
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>   @Override
>   public Tuple2 reduce(Tuple2 arg0, Tuple2 arg1) throws 
> Exception {
>   K key = arg0.f0;
>   M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>   return new Tuple2<>(key, result);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3431: [FLINK-5910] [gelly] Framework for Gelly examples

2017-03-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3431


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3402: [FLINK-5890] [gelly] GatherSumApply broken when ob...

2017-03-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3402


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905288#comment-15905288
 ] 

ASF GitHub Bot commented on FLINK-6023:
---

Github user maocorte commented on the issue:

https://github.com/apache/flink/pull/3510
  
thank you @KurtYoung for your review


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3510: [FLINK-6023] Fix Scala snippet into Process Function Doc

2017-03-10 Thread maocorte
Github user maocorte commented on the issue:

https://github.com/apache/flink/pull/3510
  
thank you @KurtYoung for your review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3446: [FLINK-5940] [checkpoint] Harden ZooKeeperCompletedCheckp...

2017-03-10 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3446
  
The mocking is indeed a little bit complex in this case. The problem is 
that you also want to check that a state handle which fails to retrieve its 
state, is properly discarded. This discard call happens as part of a callback 
to a ZooKeeper remove call. Since the curator client uses the builder API to 
construct the ZooKeeper calls, it was necessary to mock all the different build 
stages. I couldn't find a more succinct way to test this behaviour without 
starting a ZooKeeper server.

If you think it hurts the test's maintainability too much, then I can start 
a ZooKeeper server for the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5940) ZooKeeperCompletedCheckpointStore cannot handle broken state handles

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905282#comment-15905282
 ] 

ASF GitHub Bot commented on FLINK-5940:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3446
  
The mocking is indeed a little bit complex in this case. The problem is 
that you also want to check that a state handle which fails to retrieve its 
state, is properly discarded. This discard call happens as part of a callback 
to a ZooKeeper remove call. Since the curator client uses the builder API to 
construct the ZooKeeper calls, it was necessary to mock all the different build 
stages. I couldn't find a more succinct way to test this behaviour without 
starting a ZooKeeper server.

If you think it hurts the test's maintainability too much, then I can start 
a ZooKeeper server for the test.


> ZooKeeperCompletedCheckpointStore cannot handle broken state handles
> 
>
> Key: FLINK-5940
> URL: https://issues.apache.org/jira/browse/FLINK-5940
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ZooKeeperCompletedCheckpointStore}} reads a set of 
> {{RetrievableStateHandles}} from ZooKeeper upon recovery. It then tries to 
> retrieve the {{CompletedCheckpoint}} from the latest state handle. If the 
> retrieve operation fails, then the whole recovery of completed checkpoints 
> fails even though the store might have read older state handles from 
> ZooKeeper. 
> I propose to harden the behaviour by removing broken state handles and 
> returning the first successfully retrieved {{CompletedCheckpoint}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105422108
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
 // initialize or retrieve/update the state
+val (key, _) = value
--- End diff --

Sorry i didn't make myself clear. What IDS complains is the variable name 
`key` is conflicts with the following lines: 
```case CountWithTimestamp(key, count, _) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
```
It's not clear whether you want to use the `key` you just defined or the 
`key` in the match pattern.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905279#comment-15905279
 ] 

ASF GitHub Bot commented on FLINK-6023:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105422108
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
 // initialize or retrieve/update the state
+val (key, _) = value
--- End diff --

Sorry i didn't make myself clear. What IDS complains is the variable name 
`key` is conflicts with the following lines: 
```case CountWithTimestamp(key, count, _) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
```
It's not clear whether you want to use the `key` you just defined or the 
`key` in the match pattern.


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105419777
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
--- End diff --

The class name should change to CountWithTimeoutFunction to be consistency 
with java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905271#comment-15905271
 ] 

ASF GitHub Bot commented on FLINK-6023:
---

Github user maocorte commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105420747
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
 // initialize or retrieve/update the state
+val (key, _) = value
--- End diff --

thank you @KurtYoung for the report, is it ok for you if i'll change it 
with `val (key, count) = value`?
personally i don't like the using `_1` because i think it' not so clear, 
but probably it's just a mine problem ;) 


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread maocorte
Github user maocorte commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105420747
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
 // initialize or retrieve/update the state
+val (key, _) = value
--- End diff --

thank you @KurtYoung for the report, is it ok for you if i'll change it 
with `val (key, count) = value`?
personally i don't like the using `_1` because i think it' not so clear, 
but probably it's just a mine problem ;) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905267#comment-15905267
 ] 

ASF GitHub Bot commented on FLINK-6023:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105420103
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
--- End diff --

I think it should be  value: (String, String)


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105420103
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
--- End diff --

I think it should be  value: (String, String)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905265#comment-15905265
 ] 

ASF GitHub Bot commented on FLINK-6023:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105420011
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
--- End diff --

And the first type for ProcessFunction should be (String, String)


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105420011
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
--- End diff --

And the first type for ProcessFunction should be (String, String)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905263#comment-15905263
 ] 

ASF GitHub Bot commented on FLINK-6023:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105419777
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
--- End diff --

The class name should change to CountWithTimeoutFunction to be consistency 
with java


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5940) ZooKeeperCompletedCheckpointStore cannot handle broken state handles

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905258#comment-15905258
 ] 

ASF GitHub Bot commented on FLINK-5940:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3446#discussion_r105419062
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() 
throws Exception {
return null;
}
else {
-   return 
checkpointStateHandles.getLast().f0.retrieveState();
+   while(!checkpointStateHandles.isEmpty()) {
+   
Tuple2 
checkpointStateHandle = checkpointStateHandles.peekLast();
+
+   try {
+   return 
retrieveCompletedCheckpoint(checkpointStateHandle);
+   } catch (FlinkException e) {
--- End diff --

Technically, I think it was ok, because the `retrieveCompletedCheckpoint` 
method catches all `Exceptions` and wraps them in a `FlinkException`. But it's 
better to not rely on this implementation detail.


> ZooKeeperCompletedCheckpointStore cannot handle broken state handles
> 
>
> Key: FLINK-5940
> URL: https://issues.apache.org/jira/browse/FLINK-5940
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ZooKeeperCompletedCheckpointStore}} reads a set of 
> {{RetrievableStateHandles}} from ZooKeeper upon recovery. It then tries to 
> retrieve the {{CompletedCheckpoint}} from the latest state handle. If the 
> retrieve operation fails, then the whole recovery of completed checkpoints 
> fails even though the store might have read older state handles from 
> ZooKeeper. 
> I propose to harden the behaviour by removing broken state handles and 
> returning the first successfully retrieved {{CompletedCheckpoint}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3446: [FLINK-5940] [checkpoint] Harden ZooKeeperComplete...

2017-03-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3446#discussion_r105419062
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() 
throws Exception {
return null;
}
else {
-   return 
checkpointStateHandles.getLast().f0.retrieveState();
+   while(!checkpointStateHandles.isEmpty()) {
+   
Tuple2 
checkpointStateHandle = checkpointStateHandles.peekLast();
+
+   try {
+   return 
retrieveCompletedCheckpoint(checkpointStateHandle);
+   } catch (FlinkException e) {
--- End diff --

Technically, I think it was ok, because the `retrieveCompletedCheckpoint` 
method catches all `Exceptions` and wraps them in a `FlinkException`. But it's 
better to not rely on this implementation detail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905257#comment-15905257
 ] 

ASF GitHub Bot commented on FLINK-6023:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105418830
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
 // initialize or retrieve/update the state
+val (key, _) = value
--- End diff --

can we change the name here? since the IDE reports `suspicious shadowing  
by a variable pattern`. Or you can just use value._1 instead.


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105418830
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
 // initialize or retrieve/update the state
+val (key, _) = value
--- End diff --

can we change the name here? since the IDE reports `suspicious shadowing  
by a variable pattern`. Or you can just use value._1 instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5940) ZooKeeperCompletedCheckpointStore cannot handle broken state handles

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905251#comment-15905251
 ] 

ASF GitHub Bot commented on FLINK-5940:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3446#discussion_r105417543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() 
throws Exception {
return null;
}
else {
-   return 
checkpointStateHandles.getLast().f0.retrieveState();
+   while(!checkpointStateHandles.isEmpty()) {
+   
Tuple2 
checkpointStateHandle = checkpointStateHandles.peekLast();
+
+   try {
+   return 
retrieveCompletedCheckpoint(checkpointStateHandle);
+   } catch (FlinkException e) {
--- End diff --

Yes, you're right. Will change it.


> ZooKeeperCompletedCheckpointStore cannot handle broken state handles
> 
>
> Key: FLINK-5940
> URL: https://issues.apache.org/jira/browse/FLINK-5940
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ZooKeeperCompletedCheckpointStore}} reads a set of 
> {{RetrievableStateHandles}} from ZooKeeper upon recovery. It then tries to 
> retrieve the {{CompletedCheckpoint}} from the latest state handle. If the 
> retrieve operation fails, then the whole recovery of completed checkpoints 
> fails even though the store might have read older state handles from 
> ZooKeeper. 
> I propose to harden the behaviour by removing broken state handles and 
> returning the first successfully retrieved {{CompletedCheckpoint}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3446: [FLINK-5940] [checkpoint] Harden ZooKeeperComplete...

2017-03-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3446#discussion_r105417543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() 
throws Exception {
return null;
}
else {
-   return 
checkpointStateHandles.getLast().f0.retrieveState();
+   while(!checkpointStateHandles.isEmpty()) {
+   
Tuple2 
checkpointStateHandle = checkpointStateHandles.peekLast();
+
+   try {
+   return 
retrieveCompletedCheckpoint(checkpointStateHandle);
+   } catch (FlinkException e) {
--- End diff --

Yes, you're right. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5942) Harden ZooKeeperStateHandleStore to deal with corrupted data

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905242#comment-15905242
 ] 

ASF GitHub Bot commented on FLINK-5942:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3447#discussion_r105416999
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -240,7 +241,7 @@ public int exists(String pathInZooKeeper) throws 
Exception {
try {
return InstantiationUtil.deserializeObject(data, 
Thread.currentThread().getContextClassLoader());
} catch (IOException | ClassNotFoundException e) {
-   throw new Exception("Failed to deserialize state handle 
from ZooKeeper data from " +
+   throw new FlinkIOException("Failed to deserialize state 
handle from ZooKeeper data from " +
--- End diff --

The idea was to switch slowly to the new `FlinkExceptions`. However, what I 
just realize is that `FlinkIOException` does not inherit from `IOException` 
which is not good if you want to catch all `IOException`. Will revert it back 
to `IOException`.


> Harden ZooKeeperStateHandleStore to deal with corrupted data
> 
>
> Key: FLINK-5942
> URL: https://issues.apache.org/jira/browse/FLINK-5942
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ZooKeeperStateHandleStore}} cannot handle corrupted Znode data. When 
> calling {{ZooKeeperStateHandleStore.getAll}} or {{getAllSortedByName}} and 
> reading a node with corrupted data, the whole operation will fail. In such a 
> situation, Flink won't be able to recover because it will read over and over 
> again the same corrupted Znodes (in the recovery case). Therefore, I propose 
> to ignore Znodes whose data cannot be read.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105417009
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction

[jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905244#comment-15905244
 ] 

ASF GitHub Bot commented on FLINK-6023:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105417009
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction Fix Scala snippet into Process Function (Low-level Operations) Doc
> --
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Mauro Cortellazzi
>Assignee: Mauro Cortellazzi
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the 
> Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3447: [FLINK-5942] [checkpoint] Harden ZooKeeperStateHan...

2017-03-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3447#discussion_r105416999
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -240,7 +241,7 @@ public int exists(String pathInZooKeeper) throws 
Exception {
try {
return InstantiationUtil.deserializeObject(data, 
Thread.currentThread().getContextClassLoader());
} catch (IOException | ClassNotFoundException e) {
-   throw new Exception("Failed to deserialize state handle 
from ZooKeeper data from " +
+   throw new FlinkIOException("Failed to deserialize state 
handle from ZooKeeper data from " +
--- End diff --

The idea was to switch slowly to the new `FlinkExceptions`. However, what I 
just realize is that `FlinkIOException` does not inherit from `IOException` 
which is not good if you want to catch all `IOException`. Will revert it back 
to `IOException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905216#comment-15905216
 ] 

ASF GitHub Bot commented on FLINK-5995:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3503
  
Hi,@StephanEwen, Thanks a lot for your comment. I seriously think about 
where to add the test more is reasonable, because the change of this PR is 
`OperatorStateBackend`, so I added the test case in`java/ 
src/test/java/org/apache/flink/runtime/state/operatorStateBackendTest.java` Is 
this make sense for you? feel free to tell me if it is incorrect.
Best,
SunJincheng


> Get a Exception when creating the ListStateDescriptor with a TypeInformation 
> -
>
> Key: FLINK-5995
> URL: https://issues.apache.org/jira/browse/FLINK-5995
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When use OperatorState and creating the ListStateDescriptor with a 
> TypeInformation,I got a exception. The Exception info is:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Serializer not yet initialized.
>   at 
> org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169)
>   at 
> org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91)
>   at 
> org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:242)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:681)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:669)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> So, I'll add `stateDescriptor.initializeSerializerUnlessSet()` call in the 
> `getOperatorState` method. I appreciate If anyone can give me some advice?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3503: [FLINK-5995][checkpoints] fix Get a Exception when creati...

2017-03-10 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3503
  
Hi,@StephanEwen, Thanks a lot for your comment. I seriously think about 
where to add the test more is reasonable, because the change of this PR is 
`OperatorStateBackend`, so I added the test case in`java/ 
src/test/java/org/apache/flink/runtime/state/operatorStateBackendTest.java` Is 
this make sense for you? feel free to tell me if it is incorrect.
Best,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905211#comment-15905211
 ] 

ASF GitHub Bot commented on FLINK-5971:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3488
  
+1 to merge.


> JobLeaderIdService should time out registered jobs
> --
>
> Key: FLINK-5971
> URL: https://issues.apache.org/jira/browse/FLINK-5971
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the 
> moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic 
> answer.
> We should remove the {{RunningJobsRegistry}} and register instead a timeout 
> for each job which does not have a job leader associated.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3488: [FLINK-5971] [flip-6] Add timeout for registered jobs on ...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3488
  
+1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3511: Flink 5734 code generation for normalizedkey sorte...

2017-03-10 Thread heytitle
GitHub user heytitle opened a pull request:

https://github.com/apache/flink/pull/3511

Flink 5734 code generation for normalizedkey sorter



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/heytitle/flink 
FLINK-5734-code-generation-for-normalizedkey-sorter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3511.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3511


commit dd20fb63d0ef64de11ba5435cc9bcd3fab106dde
Author: Greg Hogan 
Date:   2016-10-05T20:13:02Z

[FLINK-3722] [runtime] Don't / and % when sorting

Replace division and modulus with addition and subtraction.

commit 4d87bbb8d34f9551d10cb0213fda842761f11be6
Author: Greg Hogan 
Date:   2016-10-20T13:40:27Z

Additional comments

commit fc597eb6949c502ae7883bd962feaa6b129b8172
Author: heytitle 
Date:   2017-02-19T20:28:29Z

[FLINK-5734] prepare project structure

commit ecfcd730049317d0521b25c9474c286d97322db8
Author: heytitle 
Date:   2017-02-20T14:22:41Z

[FLINK-5734] implement basic functionalities for code generation

commit 46bcebd0fc188c1a2d2e714072a75f76db2d8232
Author: heytitle 
Date:   2017-02-24T22:17:50Z

[FLINK-5734] use NormaliKeysorter's appoach to compute numKeyByte

commit 8f3e0534c3dd1677c34b975b1126257bbb685f31
Author: heytitle 
Date:   2017-03-04T21:55:21Z

[FLINK-5734] use synchronized block for SorterFactory and TemplateModel

commit 64ad5df842ca14e929b893c76a790827eb1d0b90
Author: heytitle 
Date:   2017-03-05T00:01:54Z

[FLINK-5734] replace string concat with stringbuilder

commit d81ed5076fad7112d53d40ecf148871254fa57f3
Author: heytitle 
Date:   2017-03-05T00:34:08Z

[FLINK-5734] add testcase for variable-length string

commit 237fee4de158021d51bbf2c0550933f41cd90aab
Author: heytitle 
Date:   2017-03-05T00:48:47Z

[FLINK-5734] user proper logger and also add comments

commit 3b38a6f9dee406436864af7c6052db84a9ebfd34
Author: heytitle 
Date:   2017-03-05T16:29:16Z

[FLINK-5734] checking endianness should not be involved in generating 
procedures for compare function

commit 559daf5d48f1d786de72127c92cb1c1f6a4c2ff9
Author: heytitle 
Date:   2017-03-05T20:12:50Z

[FLINK-5734] move byte-operator-mapping to be a constant of class scope

commit 56454f63ebf23ea962c793ac0dc35493bc5629cc
Author: heytitle 
Date:   2017-03-05T21:04:17Z

[FLINK-5734] add enable/disable flag in ExecutionConfig

commit 912641c4e3894b9c4cbfe82c2ac86fc96d463725
Author: heytitle 
Date:   2017-03-05T22:09:20Z

[FLINK-5734] rename variable names in old compare/swap functions to match 
with the new ones

commit bc8bb29d3aec66908aa3e1e91fd5ea2661afd1f8
Author: heytitle 
Date:   2017-03-05T22:09:49Z

[FLINK-5734] improve tests

commit 45d0aa590b21d0c23d76601f5e8efe6f17c0211b
Author: heytitle 
Date:   2017-03-06T15:14:54Z

[FLINK-5734] add timestamp to generated sorter

commit 3563e213bdb0d54bcee3c64e753f4656862554f3
Author: SerkanAli 
Date:   2017-03-07T13:27:44Z

[FLINK-5734] RESOURCE_PATH exhibits to a Temporary directory

commit 8d3e08485931c0d0d5bcf6647eb346eff65c6fa7
Author: heytitle 
Date:   2017-03-07T22:35:25Z

[FLINK-5734] integrate code generation to flink code

commit 362fc4656f60354e4ec23732ec339067adb47b1e
Author: heytitle 
Date:   2017-03-08T00:26:34Z

[FLINK-5734] prevent generating same sorter at the same time

commit 3415cbaf0dc8d9ecaf0f1c9652b15a49eececca2
Author: heytitle 
Date:   2017-03-08T00:35:38Z

[FLINK-5734] fix sorting in desc order failed

commit bcf50d285e2040443e1b9857484b63008304c9c5
Author: heytitle 
Date:   2017-03-08T12:03:19Z

[FLINK-5734] also cache constructor to save cooking time

commit bc2a37c4b6e41428f320efc9fd70e3b47f6aad5e
Author: heytitle 
Date:   2017-03-08T14:34:25Z

[FLINK-5734] refactor integration tests

commit f06f46873587b21357a86897ca5096a0dcb06a20
Author: heytitle 
Date:   2017-03-08T23:09:08Z

[FLINK-5734] add benchmarking code

commit 7c8f82f3c11bfba3a20b34fe5315a9d0cb827805
Author: SerkanAli 
Date:   2017-03-09T12:24:52Z

[FLINK-5734] get temporary directory for generated code from task config.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature

[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105398836
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -300,6 +303,36 @@ private StreamGraph 
generateInternal(List transformatio
}
 
/**
+* Transforms a {@code SideOutputTransformation}.
+*
+* 
+* For this we create a virtual node in the {@code StreamGraph} that 
holds the side-output
+* {@link org.apache.flink.util.OutputTag}.
+*
+* @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
+*/
+   private  Collection 
transformSideOutput(SideOutputTransformation sideOutput) {
+   StreamTransformation input = sideOutput.getInput();
+   Collection resultIds = transform(input);
+
+
+   // the recursive transform might have already transformed this
+   if (alreadyTransformed.containsKey(sideOutput)) {
+   return alreadyTransformed.get(sideOutput);
+   }
+
+   List virtualResultIds = new ArrayList<>();
+
+   for (int inputId : resultIds) {
+   int virtualId = StreamTransformation.getNewNodeId();
+   streamGraph.addVirtualSideOutputNode(inputId, 
virtualId, sideOutput.getOutputTag());
+   virtualResultIds.add(virtualId);
+   }
+   return virtualResultIds;
+   }
+
+
--- End diff --

Leave only one empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3501: [FLINK-5874] Restrict key types in the DataStream API.

2017-03-10 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3501
  
+1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402746
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1528,14 +1572,16 @@ public void 
testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exce
stateDesc,
new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
-   LATENESS);
+   LATENESS,
+   lateOutputTag);
--- End diff --

wrong alignment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105409533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ---
@@ -72,6 +76,11 @@ public RecordWriterOutput(
 
--- End diff --

We can't throw, because that would crash the program. This is a good catch, 
though! I will remove the duplication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105401808
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -441,26 +491,55 @@ public void close() {
private static final class CopyingChainingOutput extends 
ChainingOutput {

private final TypeSerializer serializer;
-   
+
public CopyingChainingOutput(
OneInputStreamOperator operator,
TypeSerializer serializer,
+   OutputTag outputTag,
StreamStatusProvider streamStatusProvider) {
-   super(operator, streamStatusProvider);
+   super(operator, streamStatusProvider, outputTag);
this.serializer = serializer;
}
 
--- End diff --

Again the two `collect()` have duplicate code (after the casting).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905201#comment-15905201
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3501
  
+1 to merge.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905199#comment-15905199
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105409533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ---
@@ -72,6 +76,11 @@ public RecordWriterOutput(
 
--- End diff --

We can't throw, because that would crash the program. This is a good catch, 
though! I will remove the duplication.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397306
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
--- End diff --

We do not need both lines with the checks. We can just have:

`this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105403355
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
 ---
@@ -40,6 +41,12 @@ public void collect(StreamRecord record) {
}
 
@Override
+   public  void collect(
--- End diff --

The signature fits in one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5936) Can't pass keyed vectors to KNN join algorithm

2017-03-10 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905191#comment-15905191
 ] 

Till Rohrmann commented on FLINK-5936:
--

The {{PredictDataSetOperation}} is used in the {{KNN}} as well as {{ALS}} 
implementation.

I think the id should not be added to the {{Vector}} class because this is a 
pure math class. Instead we should provide a wrapper class for that. Then we 
have to make sure that this wrapper is understood by all predict operations 
such that it unwraps the vector information, then applies the algorithm and 
then outputs the result wrapped again with the id.

> Can't pass keyed vectors to KNN join algorithm  
> 
>
> Key: FLINK-5936
> URL: https://issues.apache.org/jira/browse/FLINK-5936
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Affects Versions: 1.1.3
>Reporter: Alex DeCastro
>Priority: Minor
>
> Hi there, 
> I noticed that for Scala 2.10/Flink 1.1.3 there's no way to recover keys from 
> the predict method of KNN join even if the Vector (FlinkVector) class gets 
> extended to allow for keys.  
> If I create a class say, SparseVectorsWithKeys the predict method will return 
> SparseVectors only. Any workarounds here?  
> Would it be possible to either extend the Vector class or the ML models to 
> consume and output keyed vectors?  This is very important to NLP and pretty 
> much a lot of ML pipeline debugging -- including logging. 
> Thanks a lot
> Alex



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905177#comment-15905177
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105407208
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
--- End diff --

The thing is that if I remember correctly we discussed about how to check 
for exceptions earlier this year. And if I remember correctly the outcome was 
sth along the lines of using JUnit facilities. Feel free to correct me on this 
one.



> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> -
>
> Key: FLINK-5874
> URL: https://issues.apache.org/jira/browse/FLINK-5874
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Robert Metzger
>Assignee: Kostas Kloudas
>Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105399457
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This transformation represents a selection of a side output of an 
upstream operation with a
+ * given {@link OutputTag}.
+ *
+ * This does not create a physical operation, it only affects how 
upstream operations are
+ * connected to downstream operations.
+ *
+ * @param  The type of the elements that result from this {@code 
SideOutputTransformation}
+ */
+public class SideOutputTransformation extends StreamTransformation {
+   private final StreamTransformation input;
--- End diff --

Leave a blank line here.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3501: [FLINK-5874] Restrict key types in the DataStream API.

2017-03-10 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3501
  
Done! Let me know if you have any additional comments @zentol 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105407025
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,243 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, Object[]> keySelector =
+   new KeySelector, 
Object[]>() {
+
+   @Override
+   public Object[] 
getKey(Tuple2 value) throws Exception {
+   Object[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   Object[].class, new 
GenericTypeInfo<>(Object.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage(new StringStartsWith("Type " + 
expectedKeyType + " cannot be used as key."));
+
+   input.keyBy(keySelector);
+   }
+
+   Composite Key Tests : POJOs 

+
+   @Test
+   public void testPOJOWithNestedArrayNoHashCodeKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOWithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage(new StringStartsWith("Type " + 
expectedTypeInfo + " cannot be used as key."));
+
+   input.keyBy("id");
+   }
+
+   @Test
+   public void testPOJOWithNestedArrayAndHashCodeWorkAround() {

[jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905180#comment-15905180
 ] 

ASF GitHub Bot commented on FLINK-5874:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105407025
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,243 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
+
+   @Test
+   public void testPrimitiveArrayKeyRejection() {
+
+   KeySelector, int[]> keySelector =
+   new KeySelector, 
int[]>() {
+
+   @Override
+   public int[] getKey(Tuple2 value) 
throws Exception {
+   int[] ks = new int[value.f0.length];
+   for (int i = 0; i < ks.length; i++) {
+   ks[i] = value.f0[i];
+   }
+   return ks;
+   }
+   };
+
+   testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testBasicArrayKeyRejection() {
+
+   KeySelector, Integer[]> keySelector =
+   new KeySelector, 
Integer[]>() {
+
+   @Override
+   public Integer[] getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   };
+
+   testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+   }
+
+   @Test
+   public void testObjectArrayKeyRejection() {
+
+   KeySelector, Object[]> keySelector =
+   new KeySelector, 
Object[]>() {
+
+   @Override
+   public Object[] 
getKey(Tuple2 value) throws Exception {
+   Object[] ks = new 
TestClass[value.f0.length];
+   for (int i = 0; i < ks.length; 
i++) {
+   ks[i] = new 
TestClass(value.f0[i]);
+   }
+   return ks;
+   }
+   };
+
+   ObjectArrayTypeInfo keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
+   Object[].class, new 
GenericTypeInfo<>(Object.class));
+
+   testKeyRejection(keySelector, keyTypeInfo);
+   }
+
+   private  void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = env.fromElements(
+   new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+   );
+
+   Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   expectedException.expectMessage(new StringStartsWith("Type " + 
expectedKeyType + " cannot be used as key."));
+
+   input.keyBy(keySelector);
+   }
+
+   Composite Key Tests : POJOs 

+
+   @Test
+   public void testPOJOWithNestedArrayNoHashCodeKeyRejection() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new POJOWithHashCode(new int[] {1, 2}));
+
+   TypeInformation expectedTypeInfo = new 
TupleTypeInfo>(
+   
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+   // adjust the rule
+   expectedException.expect(InvalidProgramException.class);
+   

[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905145#comment-15905145
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402830
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1618,14 +1664,16 @@ public void 
testDropDueToLatenessSessionZeroLateness() throws Exception {
stateDesc,
new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
-   LATENESS);
+   LATENESS,
--- End diff --

wrong alignment


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ...

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3501#discussion_r105407208
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -906,6 +919,256 @@ public void testChannelSelectors() {
}
 
/
+   // KeyBy testing
+   /
+
+   @Rule
+   public ExpectedException expectedException = ExpectedException.none();
--- End diff --

The thing is that if I remember correctly we discussed about how to check 
for exceptions earlier this year. And if I remember correctly the outcome was 
sth along the lines of using JUnit facilities. Feel free to correct me on this 
one.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397096
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java ---
@@ -46,7 +46,15 @@
public TypeHint() {
this.typeInfo = TypeExtractor.createTypeInfo(this, 
TypeHint.class, getClass(), 0);
}
-   
+
+   /**
+* Creates a hint for the generic type in the class signature.
+*/
+   public TypeHint(Class baseClass, Object instance, int 
genericParameterPos) {
+   this.typeInfo = TypeExtractor.createTypeInfo(instance, 
baseClass, instance.getClass(), genericParameterPos);
+   }
+
+
--- End diff --

Remove one of the 2 empty lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >