[jira] [Assigned] (FLINK-9300) Improve error message when in-memory state is too large

2018-05-04 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9300:
---

Assignee: vinoyang

> Improve error message when in-memory state is too large
> ---
>
> Key: FLINK-9300
> URL: https://issues.apache.org/jira/browse/FLINK-9300
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: Ken Krugler
>Assignee: vinoyang
>Priority: Minor
>
> Currently in the {{MemCheckpointStreamFactory.checkSize()}} method, it can 
> throw an {{IOException}} via:
> {code:java}
> throw new IOException(
> "Size of the state is larger than the maximum permitted memory-backed state. 
> Size="
> + size + " , maxSize=" + maxSize
> + " . Consider using a different state backend, like the File System State 
> backend.");{code}
> But this will happen even if you’re using the File System State backend.
> This came up here: 
> [https://stackoverflow.com/questions/50149005/ioexception-size-of-the-state-is-larger-than-the-maximum-permitted-memory-backe]
> We could change the message to be:
> {quote}Please consider increasing the maximum permitted memory size, 
> increasing the task manager parallelism, or using a non-memory-based state 
> backend such as RocksDB.
> {quote}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors

2018-05-04 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9299:
---

Assignee: vinoyang

> ProcessWindowFunction documentation Java examples have errors
> -
>
> Key: FLINK-9299
> URL: https://issues.apache.org/jira/browse/FLINK-9299
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: Ken Krugler
>Assignee: vinoyang
>Priority: Minor
>
> In looking at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation],
>  I noticed a few errors...
>  * "This allows to incrementally compute windows" should be "This allows it 
> to incrementally compute windows"
>  * DataStream input = ...; should be 
> DataStream> input = ...;
>  * The getResult() method needs to cast one of the accumulator values to a 
> double, if that's what it is going to return.
>  * MyProcessWindowFunction needs to extend, not implement 
> ProcessWindowFunction
>  * MyProcessWindowFunction needs to implement a process() method, not an 
> apply() method.
>  * The call to .timeWindow takes a Time parameter, not a window assigner.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7001) Improve performance of Sliding Time Window with pane optimization

2018-05-04 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464544#comment-16464544
 ] 

Shuyi Chen commented on FLINK-7001:
---

Hi [~pgrulich], the paper is a nice read. And the technique applies to Tumble, 
Sliding & Session window, which is a good win, and the evaluation result looks 
good. Also, it seems you already have an implementation for Scotty using Apache 
Flink based on the paper. 

Maybe, you and [~jark] can share more, for each approach, about the detail 
design, pros and cons, and we can discuss them here?

> Improve performance of Sliding Time Window with pane optimization
> -
>
> Key: FLINK-7001
> URL: https://issues.apache.org/jira/browse/FLINK-7001
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>
> Currently, the implementation of time-based sliding windows treats each 
> window individually and replicates records to each window. For a window of 10 
> minute size that slides by 1 second the data is replicated 600 fold (10 
> minutes / 1 second). We can optimize sliding window by divide windows into 
> panes (aligned with slide), so that we can avoid record duplication and 
> leverage the checkpoint.
> I will attach a more detail design doc to the issue.
> The following issues are similar to this issue: FLINK-5387, FLINK-6990



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464530#comment-16464530
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5427
  
@StephanEwen thanks again for the feedback, which I took to heart and 
simplified the hook.  It now has a `reset ` method that is called only in the 
special case.

I will refactor the thread context code in a separate PR.



> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

2018-05-04 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5427
  
@StephanEwen thanks again for the feedback, which I took to heart and 
simplified the hook.  It now has a `reset ` method that is called only in the 
special case.

I will refactor the thread context code in a separate PR.



---


[jira] [Created] (FLINK-9300) Improve error message when in-memory state is too large

2018-05-04 Thread Ken Krugler (JIRA)
Ken Krugler created FLINK-9300:
--

 Summary: Improve error message when in-memory state is too large
 Key: FLINK-9300
 URL: https://issues.apache.org/jira/browse/FLINK-9300
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.4.2
Reporter: Ken Krugler


Currently in the {{MemCheckpointStreamFactory.checkSize()}} method, it can 
throw an {{IOException}} via:


{code:java}
throw new IOException(
"Size of the state is larger than the maximum permitted memory-backed state. 
Size="
+ size + " , maxSize=" + maxSize
+ " . Consider using a different state backend, like the File System State 
backend.");{code}

But this will happen even if you’re using the File System State backend.

This came up here: 
[https://stackoverflow.com/questions/50149005/ioexception-size-of-the-state-is-larger-than-the-maximum-permitted-memory-backe]

We could change the message to be:
{quote}Please consider increasing the maximum permitted memory size, increasing 
the task manager parallelism, or using a non-memory-based state backend such as 
RocksDB.
{quote}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors

2018-05-04 Thread Ken Krugler (JIRA)
Ken Krugler created FLINK-9299:
--

 Summary: ProcessWindowFunction documentation Java examples have 
errors
 Key: FLINK-9299
 URL: https://issues.apache.org/jira/browse/FLINK-9299
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.4.2
Reporter: Ken Krugler


In looking at 
[https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation],
 I noticed a few errors...
 * "This allows to incrementally compute windows" should be "This allows it to 
incrementally compute windows"
 * DataStream input = ...; should be 
DataStream> input = ...;
 * The getResult() method needs to cast one of the accumulator values to a 
double, if that's what it is going to return.
 * MyProcessWindowFunction needs to extend, not implement ProcessWindowFunction
 * MyProcessWindowFunction needs to implement a process() method, not an 
apply() method.
 * The call to .timeWindow takes a Time parameter, not a window assigner.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464434#comment-16464434
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/5860
  
Thanks for reviewing @fhueske  @aljoscha and @kl0u ! I have addressed the 
latest review comments. Can you PTAL (again) ?


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-05-04 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/5860
  
Thanks for reviewing @fhueske  @aljoscha and @kl0u ! I have addressed the 
latest review comments. Can you PTAL (again) ?


---


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464404#comment-16464404
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r186223237
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -473,6 +482,15 @@ private boolean shouldRoll(BucketState bucketState) 
throws IOException {
subtaskIndex,
writePosition,
batchSize);
+   } else {
+   long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
--- End diff --

Updated method signature for `shouldRoll` to include the 
`currentProcessingTime` 


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-05-04 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r186223237
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -473,6 +482,15 @@ private boolean shouldRoll(BucketState bucketState) 
throws IOException {
subtaskIndex,
writePosition,
batchSize);
+   } else {
+   long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
--- End diff --

Updated method signature for `shouldRoll` to include the 
`currentProcessingTime` 


---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-05-04 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r186223099
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -908,6 +929,20 @@ private void 
handlePendingFilesForPreviousCheckpoints(Map pe
return this;
}
 
+   /**
+* Sets the roll over interval in milliseconds.
+*
+*
+* When a bucket part file is older than the roll over interval, a 
new bucket part file is
+* started and the old one is closed. The name of the bucket file 
depends on the {@link Bucketer}.
+*
+* @param batchRolloverInterval The roll over interval in milliseconds
+*/
+   public BucketingSink setBatchRolloverInterval(long 
batchRolloverInterval) {
+   this.batchRolloverInterval = batchRolloverInterval;
+   return this;
--- End diff --

Added a check for `batchRolloverInterval` to be a positive non-zero value.


---


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464403#comment-16464403
 ] 

ASF GitHub Bot commented on FLINK-9138:
---

Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r186223099
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -908,6 +929,20 @@ private void 
handlePendingFilesForPreviousCheckpoints(Map pe
return this;
}
 
+   /**
+* Sets the roll over interval in milliseconds.
+*
+*
+* When a bucket part file is older than the roll over interval, a 
new bucket part file is
+* started and the old one is closed. The name of the bucket file 
depends on the {@link Bucketer}.
+*
+* @param batchRolloverInterval The roll over interval in milliseconds
+*/
+   public BucketingSink setBatchRolloverInterval(long 
batchRolloverInterval) {
+   this.batchRolloverInterval = batchRolloverInterval;
+   return this;
--- End diff --

Added a check for `batchRolloverInterval` to be a positive non-zero value.


> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9298) setup_quickstart hanging

2018-05-04 Thread Alex Chen (JIRA)
Alex Chen created FLINK-9298:


 Summary: setup_quickstart hanging
 Key: FLINK-9298
 URL: https://issues.apache.org/jira/browse/FLINK-9298
 Project: Flink
  Issue Type: Bug
Reporter: Alex Chen


Well, I follow tutorial 
([https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html)].
 Regardless there are several typos like log files name incorrect, flink was 
hanging executing `

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000`, looks 
like:

 

```

➜  build-target git:(master) bin/flink run 
examples/streaming/SocketWindowWordCount.jar --port 9000

Starting execution of program

(hanging)

```

Below are logs

 

```

➜  log git:(master) ls -l

total 136

-rw-r--r--  1 wander  staff   6189  5  5 04:34 flink-wander-client-localhost.log

-rw-r--r--  1 wander  staff  17823  5  5 04:34 
flink-wander-standalonesession-0-localhost.log

-rw-r--r--  1 wander  staff      0  5  5 04:33 
flink-wander-standalonesession-0-localhost.out

-rw-r--r--  1 wander  staff  38220  5  5 04:34 
flink-wander-taskexecutor-0-localhost.log

-rw-r--r--  1 wander  staff      0  5  5 04:33 
flink-wander-taskexecutor-0-localhost.out

 

 

➜  log git:(master) cat flink-wander-client-localhost.log 

2018-05-05 04:34:19,210 INFO  org.apache.flink.client.cli.CliFrontend           
            - 


2018-05-05 04:34:19,212 INFO  org.apache.flink.client.cli.CliFrontend           
            -  Starting Command Line Client (Version: 1.6-SNAPSHOT, 
Rev:5ac4d29, Date:04.05.2018 @ 22:03:32 CST)

2018-05-05 04:34:19,212 INFO  org.apache.flink.client.cli.CliFrontend           
            -  OS current user: wander

2018-05-05 04:34:19,732 INFO  org.apache.flink.client.cli.CliFrontend           
            -  Current Hadoop/Kerberos user: wander

2018-05-05 04:34:19,733 INFO  org.apache.flink.client.cli.CliFrontend           
            -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
1.8/25.144-b01

2018-05-05 04:34:19,733 INFO  org.apache.flink.client.cli.CliFrontend           
            -  Maximum heap size: 1820 MiBytes

2018-05-05 04:34:19,733 INFO  org.apache.flink.client.cli.CliFrontend           
            -  JAVA_HOME: 
/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home

2018-05-05 04:34:19,735 INFO  org.apache.flink.client.cli.CliFrontend           
            -  Hadoop version: 2.4.1

2018-05-05 04:34:19,735 INFO  org.apache.flink.client.cli.CliFrontend           
            -  JVM Options:

2018-05-05 04:34:19,735 INFO  org.apache.flink.client.cli.CliFrontend           
            -     
-Dlog.file=/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/log/flink-wander-client-localhost.log

2018-05-05 04:34:19,736 INFO  org.apache.flink.client.cli.CliFrontend           
            -     
-Dlog4j.configuration=file:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/log4j-cli.properties

2018-05-05 04:34:19,736 INFO  org.apache.flink.client.cli.CliFrontend           
            -     
-Dlogback.configurationFile=file:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/logback.xml

2018-05-05 04:34:19,736 INFO  org.apache.flink.client.cli.CliFrontend           
            -  Program Arguments:

2018-05-05 04:34:19,736 INFO  org.apache.flink.client.cli.CliFrontend           
            -     run

2018-05-05 04:34:19,736 INFO  org.apache.flink.client.cli.CliFrontend           
            -     examples/streaming/SocketWindowWordCount.jar

2018-05-05 04:34:19,736 INFO  org.apache.flink.client.cli.CliFrontend           
            -     --port

2018-05-05 04:34:19,736 INFO  org.apache.flink.client.cli.CliFrontend           
            -     9000

2018-05-05 04:34:19,736 INFO  org.apache.flink.client.cli.CliFrontend           
            -  Classpath: 
:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/lib/flink-dist_2.11-1.6-SNAPSHOT.jar:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/lib/flink-python_2.11-1.6-SNAPSHOT.jar:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/lib/flink-shaded-hadoop2-uber-1.6-SNAPSHOT.jar:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/lib/log4j-1.2.17.jar:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/lib/slf4j-log4j12-1.7.7.jar:::

2018-05-05 04:34:19,736 INFO  org.apache.flink.client.cli.CliFrontend           
            - 

[jira] [Commented] (FLINK-7001) Improve performance of Sliding Time Window with pane optimization

2018-05-04 Thread Philipp Grulich (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464248#comment-16464248
 ] 

Philipp Grulich commented on FLINK-7001:


Hi [~walterddr] ,

we published recently this paper about our new Window Operator:
http://www.user.tu-berlin.de/powibol/assets/publications/traub-scotty-icde-2018.pdf

It would definitely provide a huge performance improvement in contrast to the 
current Flink implementation.

I think a FLIP was not written yet.

Best,
Philipp

 

 

> Improve performance of Sliding Time Window with pane optimization
> -
>
> Key: FLINK-7001
> URL: https://issues.apache.org/jira/browse/FLINK-7001
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>
> Currently, the implementation of time-based sliding windows treats each 
> window individually and replicates records to each window. For a window of 10 
> minute size that slides by 1 second the data is replicated 600 fold (10 
> minutes / 1 second). We can optimize sliding window by divide windows into 
> panes (aligned with slide), so that we can avoid record duplication and 
> leverage the checkpoint.
> I will attach a more detail design doc to the issue.
> The following issues are similar to this issue: FLINK-5387, FLINK-6990



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7001) Improve performance of Sliding Time Window with pane optimization

2018-05-04 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464239#comment-16464239
 ] 

Rong Rong commented on FLINK-7001:
--

Hi [~jark], 

Is there a FLIP being proposed based on the pain points discussed in here? 
This inefficiency in windowing has been observed more and more frequently in 
our day-to-day operations lately.

We would like to contribute to the design and the implementation of this 
improvement if possible :-)

Thanks,
Rong

> Improve performance of Sliding Time Window with pane optimization
> -
>
> Key: FLINK-7001
> URL: https://issues.apache.org/jira/browse/FLINK-7001
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>
> Currently, the implementation of time-based sliding windows treats each 
> window individually and replicates records to each window. For a window of 10 
> minute size that slides by 1 second the data is replicated 600 fold (10 
> minutes / 1 second). We can optimize sliding window by divide windows into 
> panes (aligned with slide), so that we can avoid record duplication and 
> leverage the checkpoint.
> I will attach a more detail design doc to the issue.
> The following issues are similar to this issue: FLINK-5387, FLINK-6990



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #:

2018-05-04 Thread zentol
Github user zentol commented on the pull request:


https://github.com/apache/flink/commit/c8fa8d025684c2225824c54a7285bbfdec7cfddc#commitcomment-28859963
  
In 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java:
In 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
 on line 25:
you added spaces here


---


[jira] [Commented] (FLINK-9292) Remove TypeInfoParser

2018-05-04 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464135#comment-16464135
 ] 

Stephan Ewen commented on FLINK-9292:
-

Feel free to take over the second half of this issue, if you want...

> Remove TypeInfoParser
> -
>
> Key: FLINK-9292
> URL: https://issues.apache.org/jira/browse/FLINK-9292
> Project: Flink
>  Issue Type: Task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}.
> Because the TypeInfoParser is also not working correctly with respect to 
> classloading, we should remove it. Users still find the class, try to use it, 
> and run into problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9292) Remove TypeInfoParser

2018-05-04 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464134#comment-16464134
 ] 

Stephan Ewen commented on FLINK-9292:
-

I have merged the first part of removing the the {{TypeInfoParser}}, which 
removes it from various tests and the DataSet API.

> Remove TypeInfoParser
> -
>
> Key: FLINK-9292
> URL: https://issues.apache.org/jira/browse/FLINK-9292
> Project: Flink
>  Issue Type: Task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}.
> Because the TypeInfoParser is also not working correctly with respect to 
> classloading, we should remove it. Users still find the class, try to use it, 
> and run into problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...

2018-05-04 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5940
  
LOL. I think I found a way: 
1. Rebase #3764 over to current master;
2. Rebase this branch to the rebased #3764;
3. Make changes on top

:-) 


---


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464071#comment-16464071
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5940
  
LOL. I think I found a way: 
1. Rebase #3764 over to current master;
2. Rebase this branch to the rebased #3764;
3. Make changes on top

:-) 


> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464063#comment-16464063
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5940
  
Hmmm, good point. The discussion would be lost.
How about I put your changes on top of Haohui's changes before merging?


> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...

2018-05-04 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5940
  
Hmmm, good point. The discussion would be lost.
How about I put your changes on top of Haohui's changes before merging?


---


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464039#comment-16464039
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5940
  
Thanks @suez1224 @fhueske for the comments. I will change them accordingly. 

Yes I copied a lot of test cases from @haohui's PR for my own testing. I 
can definitely put it on top given the runtime support is already merged in 
#. Procedure-wise question: should I rebase his commit then add my change 
on top, then attached to this PR? I am not sure if there's a clever way to both 
preserve the discussion in this thread and rebase on top of his change.


> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...

2018-05-04 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5940
  
Thanks @suez1224 @fhueske for the comments. I will change them accordingly. 

Yes I copied a lot of test cases from @haohui's PR for my own testing. I 
can definitely put it on top given the runtime support is already merged in 
#. Procedure-wise question: should I rebase his commit then add my change 
on top, then attached to this PR? I am not sure if there's a clever way to both 
preserve the discussion in this thread and rebase on top of his change.


---


[jira] [Created] (FLINK-9297) Make size of JobManager's ioExecutor configurable

2018-05-04 Thread Jelmer Kuperus (JIRA)
Jelmer Kuperus created FLINK-9297:
-

 Summary: Make size of JobManager's ioExecutor configurable
 Key: FLINK-9297
 URL: https://issues.apache.org/jira/browse/FLINK-9297
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 1.4.2, 1.4.1, 1.4.0
Reporter: Jelmer Kuperus


With flink 1.4.0 cleaning of checkpoints is handled on the job manager, the 
SharedStateRegistry uses the job manager's [io 
executor|https://github.com/apache/flink/blob/e5ed2fbc4a31808bea8be89b371bbe4269288003/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1999]
 as  the executor to schedule disposals of checkpoint state on.  This executor 
has a fixed size equal to the number of cpu cores. When a lot of small files 
are created this may not be enough.

It would be good to make this setting configurable. Initializing an executor 
service with the number of cpu's on the system makes sense for cpu intensive 
tasks less so for io based workloads

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9297) Make size of JobManager's ioExecutor configurable

2018-05-04 Thread Jelmer Kuperus (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jelmer Kuperus updated FLINK-9297:
--
Description: 
With flink 1.4.0 cleaning of checkpoints is handled on the job manager, the 
SharedStateRegistry uses the job manager's [io 
executor|https://github.com/apache/flink/blob/e5ed2fbc4a31808bea8be89b371bbe4269288003/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1999]
 as  the executor to schedule disposals of checkpoint state on.  This executor 
has a fixed size equal to the number of cpu cores. When a lot of small files 
are created by many jobs and a slow filesystem is used this may not be enough.

It would be good to make this setting configurable. Initializing an executor 
service with the number of cpu's on the system makes sense for cpu intensive 
tasks less so for io based workloads

 

 

 

  was:
With flink 1.4.0 cleaning of checkpoints is handled on the job manager, the 
SharedStateRegistry uses the job manager's [io 
executor|https://github.com/apache/flink/blob/e5ed2fbc4a31808bea8be89b371bbe4269288003/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1999]
 as  the executor to schedule disposals of checkpoint state on.  This executor 
has a fixed size equal to the number of cpu cores. When a lot of small files 
are created this may not be enough.

It would be good to make this setting configurable. Initializing an executor 
service with the number of cpu's on the system makes sense for cpu intensive 
tasks less so for io based workloads

 

 

 


> Make size of JobManager's ioExecutor configurable
> -
>
> Key: FLINK-9297
> URL: https://issues.apache.org/jira/browse/FLINK-9297
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.4.0, 1.4.1, 1.4.2
>Reporter: Jelmer Kuperus
>Priority: Major
>
> With flink 1.4.0 cleaning of checkpoints is handled on the job manager, the 
> SharedStateRegistry uses the job manager's [io 
> executor|https://github.com/apache/flink/blob/e5ed2fbc4a31808bea8be89b371bbe4269288003/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1999]
>  as  the executor to schedule disposals of checkpoint state on.  This 
> executor has a fixed size equal to the number of cpu cores. When a lot of 
> small files are created by many jobs and a slow filesystem is used this may 
> not be enough.
> It would be good to make this setting configurable. Initializing an executor 
> service with the number of cpu's on the system makes sense for cpu intensive 
> tasks less so for io based workloads
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8497) KafkaConsumer throws NPE if topic doesn't exist

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463984#comment-16463984
 ] 

ASF GitHub Bot commented on FLINK-8497:
---

Github user alexpf commented on a diff in the pull request:

https://github.com/apache/flink/pull/5929#discussion_r186110995
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
 ---
@@ -74,7 +74,12 @@ protected void initializeConnections() {
 
try {
for (String topic : topics) {
-   for (PartitionInfo partitionInfo : 
kafkaConsumer.partitionsFor(topic)) {
+   List topicPartitions = 
kafkaConsumer.partitionsFor(topic);
+   if (topicPartitions == null) {
+   throw new IllegalStateException("The 
topic " + topic + " does not exist");
--- End diff --

I've took a look into it. Ideally we have to make the go/fail decision one 
level higher, at `FlinkKafkaConsumerBase`. It uses the partition discovery both 
for the initial seed and further on the run, so that's the guy who should 
decide. The problem I see here is that the 
`AbstractPartitionDiscoverer#discoverPartitions` doesn't just get the list of 
available partitions, but also filters the partitions applicable for the 
current task. So, once we get the partition list, we can't say whether the list 
is empty because nothing is found, or because the partitions have been 
post-filtered.
The only way to communicate this difference, as I see it now, is to 
introduce some new specific exception, and catch it at the 
`FlinkKafkaConsumerBase`.


> KafkaConsumer throws NPE if topic doesn't exist
> ---
>
> Key: FLINK-8497
> URL: https://issues.apache.org/jira/browse/FLINK-8497
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: chris snow
>Assignee: Aleksei Lesnov
>Priority: Minor
>
> If I accidentally set the kafka consumer with a topic that doesn't exist:
> {code:java}
> FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
>"does_not_exist",
> new JSONKeyValueDeserializationSchema(false),
> properties
> );
> DataStream input = env.addSource(kafkaConsumer);{code}
> Flink throws NPE
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748){code}
> Maybe Flink could through an IllegalStateException("Topic not found")?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE...

2018-05-04 Thread alexpf
Github user alexpf commented on a diff in the pull request:

https://github.com/apache/flink/pull/5929#discussion_r186110995
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
 ---
@@ -74,7 +74,12 @@ protected void initializeConnections() {
 
try {
for (String topic : topics) {
-   for (PartitionInfo partitionInfo : 
kafkaConsumer.partitionsFor(topic)) {
+   List topicPartitions = 
kafkaConsumer.partitionsFor(topic);
+   if (topicPartitions == null) {
+   throw new IllegalStateException("The 
topic " + topic + " does not exist");
--- End diff --

I've took a look into it. Ideally we have to make the go/fail decision one 
level higher, at `FlinkKafkaConsumerBase`. It uses the partition discovery both 
for the initial seed and further on the run, so that's the guy who should 
decide. The problem I see here is that the 
`AbstractPartitionDiscoverer#discoverPartitions` doesn't just get the list of 
available partitions, but also filters the partitions applicable for the 
current task. So, once we get the partition list, we can't say whether the list 
is empty because nothing is found, or because the partitions have been 
post-filtered.
The only way to communicate this difference, as I see it now, is to 
introduce some new specific exception, and catch it at the 
`FlinkKafkaConsumerBase`.


---


[jira] [Updated] (FLINK-8978) End-to-end test: Job upgrade

2018-05-04 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-8978:
--
Fix Version/s: (was: 1.5.1)
   1.5.0

> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8978) End-to-end test: Job upgrade

2018-05-04 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-8978.
-
Resolution: Fixed

> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-8978) End-to-end test: Job upgrade

2018-05-04 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reopened FLINK-8978:
---

Change fix version

> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8978) End-to-end test: Job upgrade

2018-05-04 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-8978.
-
Resolution: Fixed

Merged in:

master: 5ac4d29609

release-1.5: 54befe5a31

> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463918#comment-16463918
 ] 

ASF GitHub Bot commented on FLINK-8978:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5947


> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5947: [FLINK-8978] Stateful generic stream job upgrade e...

2018-05-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5947


---


[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463907#comment-16463907
 ] 

ASF GitHub Bot commented on FLINK-8978:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5947
  
LGTM  Will merge this.


> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5947: [FLINK-8978] Stateful generic stream job upgrade e2e test

2018-05-04 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5947
  
LGTM 👍 Will merge this.


---


[jira] [Closed] (FLINK-9254) Move NotSoMiniClusterIterations to be an end-to-end test

2018-05-04 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-9254.
-
Resolution: Fixed

Merged in:

master: b50cebb656

release-1.5: ed3447e343

> Move NotSoMiniClusterIterations to be an end-to-end test
> 
>
> Key: FLINK-9254
> URL: https://issues.apache.org/jira/browse/FLINK-9254
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Minor
> Fix For: 1.5.0
>
>
> NotSoMiniClusterIterations should be a test that we don’t run for every 
> commit but nightly e2e.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9254) Move NotSoMiniClusterIterations to be an end-to-end test

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463884#comment-16463884
 ] 

ASF GitHub Bot commented on FLINK-9254:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5921


> Move NotSoMiniClusterIterations to be an end-to-end test
> 
>
> Key: FLINK-9254
> URL: https://issues.apache.org/jira/browse/FLINK-9254
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Minor
> Fix For: 1.5.0
>
>
> NotSoMiniClusterIterations should be a test that we don’t run for every 
> commit but nightly e2e.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5921: [FLINK-9254] Move NotSoMiniClusterIterations to be...

2018-05-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5921


---


[jira] [Commented] (FLINK-9254) Move NotSoMiniClusterIterations to be an end-to-end test

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463876#comment-16463876
 ] 

ASF GitHub Bot commented on FLINK-9254:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5921
  
In that case, LGTM  Will merge this.


> Move NotSoMiniClusterIterations to be an end-to-end test
> 
>
> Key: FLINK-9254
> URL: https://issues.apache.org/jira/browse/FLINK-9254
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Minor
> Fix For: 1.5.0
>
>
> NotSoMiniClusterIterations should be a test that we don’t run for every 
> commit but nightly e2e.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5921: [FLINK-9254] Move NotSoMiniClusterIterations to be an end...

2018-05-04 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5921
  
In that case, LGTM 👍 Will merge this.


---


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463873#comment-16463873
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5950
  
Overall, I think this looks good for me now  


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

2018-05-04 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5950
  
Overall, I think this looks good for me now 👍 


---


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463869#comment-16463869
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r186082804
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -102,17 +99,24 @@
 *
 * @return the deserialized serializer.
 */
-   public static  TypeSerializer tryReadSerializer(DataInputView in, 
ClassLoader userCodeClassLoader, boolean useDummyPlaceholder) {
+   public static  TypeSerializer tryReadSerializer(
+   DataInputView in,
+   ClassLoader userCodeClassLoader,
+   boolean useDummyPlaceholder) throws IOException {
+
final 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy proxy =
-   new 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader,
 useDummyPlaceholder);
+   new 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader);
 
try {
proxy.read(in);
return proxy.getTypeSerializer();
-   } catch (IOException e) {
-   LOG.warn("Deserialization of serializer errored; 
replacing with null.", e);
-
-   return null;
+   } catch (UnloadableTypeSerializerException e) {
--- End diff --

I would let this bubble up one more level, remove the flag here and only 
catch `UnloadableTypeSerializerException ` in the case where this method is 
called with `true`.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> 

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

2018-05-04 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r186082804
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -102,17 +99,24 @@
 *
 * @return the deserialized serializer.
 */
-   public static  TypeSerializer tryReadSerializer(DataInputView in, 
ClassLoader userCodeClassLoader, boolean useDummyPlaceholder) {
+   public static  TypeSerializer tryReadSerializer(
+   DataInputView in,
+   ClassLoader userCodeClassLoader,
+   boolean useDummyPlaceholder) throws IOException {
+
final 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy proxy =
-   new 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader,
 useDummyPlaceholder);
+   new 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader);
 
try {
proxy.read(in);
return proxy.getTypeSerializer();
-   } catch (IOException e) {
-   LOG.warn("Deserialization of serializer errored; 
replacing with null.", e);
-
-   return null;
+   } catch (UnloadableTypeSerializerException e) {
--- End diff --

I would let this bubble up one more level, remove the flag here and only 
catch `UnloadableTypeSerializerException ` in the case where this method is 
called with `true`.


---


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463867#comment-16463867
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5950
  
Yes, I think we can only remove the flag further when splitting up 
`readSerializersAndConfigsWithResilience`, but I guess it is ok to leave it if 
we change this soon anyways.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

2018-05-04 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5950
  
Yes, I think we can only remove the flag further when splitting up 
`readSerializersAndConfigsWithResilience`, but I guess it is ok to leave it if 
we change this soon anyways.


---


[jira] [Commented] (FLINK-9292) Remove TypeInfoParser

2018-05-04 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463784#comment-16463784
 ] 

Stephan Ewen commented on FLINK-9292:
-

This issue needs a lot of care to not break functionality. 

I have a first part of that done already, taking care also of the deprecated 
parts of the DataSet API. I would recommend for that to be merged first...

A lot of remaining work is actually adjusting tests...

> Remove TypeInfoParser
> -
>
> Key: FLINK-9292
> URL: https://issues.apache.org/jira/browse/FLINK-9292
> Project: Flink
>  Issue Type: Task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}.
> Because the TypeInfoParser is also not working correctly with respect to 
> classloading, we should remove it. Users still find the class, try to use it, 
> and run into problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463761#comment-16463761
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186033706
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase {
 (8000L, "8", "Hello World"),
 (2L, "20", "Hello World"))
 
+  @Test
+  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+// create a watermark with 10ms offset to delay the window emission by 
10ms to verify merge
+val sessionWindowTestdata = List(
+  (1L, 1, "Hello"),
+  (2L, 2, "Hello"),
+  (8L, 8, "Hello"),
+  (9L, 9, "Hello World"),
+  (4L, 4, "Hello"),
+  (16L, 16, "Hello"))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
+StreamITCase.clear
+val stream = env
+  .fromCollection(sessionWindowTestdata)
+  .assignTimestampsAndWatermarks(new 
TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
+
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val table = stream.toTable(tEnv, 'long, 'int, 'string, 
'rowtime.rowtime)
+tEnv.registerTable("MyTable", table)
+
+val sqlQuery = "SELECT string, " +
+  "  COUNT(DISTINCT long) " +
--- End diff --

It would be good to add the end timestamp of the windows 
(`SESSION_END(rowtime, INTERVAL '0.005' SECOND)`) to make it easier to eyeball 
the expected test results.


> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463760#comment-16463760
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186022377
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -641,7 +641,8 @@ class AggregationCodeGenerator(
|  java.util.Map.Entry entry = (java.util.Map.Entry) 
mergeIt$i.next();
|  Object k = entry.getKey();
--- End diff --

Change this line to `${classOf[Row].getCanonicalName} k = 
(${classOf[Row].getCanonicalName}) entry.getKey();`


> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463763#comment-16463763
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186029985
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.{SessionGroupWindow, 
SlidingGroupWindow, TumblingGroupWindow}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.{Ignore, Test}
+
+class DistinctAggregateTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)](
+"MyTable",
+'a, 'b, 'c,
+'proctime.proctime, 'rowtime.rowtime)
+
+  @Test
+  def testDistinct(): Unit = {
+val sql = "SELECT DISTINCT a, b, c FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  streamTableNode(0),
+  term("select", "a, b, c")
+),
+term("groupBy", "a, b, c"),
+term("select", "a, b, c")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  // TODO: this query should be optimized to only have a single 
DataStreamGroupAggregate
+  // TODO: reopen this until FLINK-7144 fixed
+  @Ignore
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  streamTableNode(0),
+  term("select", "a")
+),
+term("groupBy", "a"),
+term("select", "a")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testDistinctAggregateOnTumbleWindow(): Unit = {
+val sqlQuery = "SELECT COUNT(DISTINCT a), " +
+  "  SUM(a) " +
+  "FROM MyTable " +
+  "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) "
+
+val expected = unaryNode(
+  "DataStreamGroupWindowAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "rowtime", "a")
+  ),
+  term("window", TumblingGroupWindow('w$, 'rowtime, 90.millis)),
+  term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(a) AS EXPR$1")
+)
+
+streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = {
+val sqlQuery = "SELECT COUNT(DISTINCT a), " +
+  "  SUM(DISTINCT a), " +
+  "  MAX(DISTINCT a) " +
+  "FROM MyTable " +
+  "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) "
+
+val expected = unaryNode(
+  "DataStreamGroupWindowAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "rowtime", "a")
+  ),
+  term("window", SlidingGroupWindow('w$, 'rowtime, 360.millis, 
90.millis)),
+  term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS 
EXPR$1",
+"MAX(DISTINCT a) AS EXPR$2")
+)
+
+streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = {
+val sqlQuery = "SELECT a, " +
+  "  

[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463759#comment-16463759
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186033401
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase {
 (8000L, "8", "Hello World"),
 (2L, "20", "Hello World"))
 
+  @Test
+  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+// create a watermark with 10ms offset to delay the window emission by 
10ms to verify merge
+val sessionWindowTestdata = List(
+  (1L, 1, "Hello"),
--- End diff --

The test is not checking for DISTINCT semantics since all aggregated values 
are distinct. We could do `COUNT(DISTINCT num)` (`int` has to be renamed to 
`num` because its a SQL keyword).


> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463758#comment-16463758
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r185955967
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -641,7 +641,8 @@ class AggregationCodeGenerator(
|  java.util.Map.Entry entry = (java.util.Map.Entry) 
mergeIt$i.next();
|  Object k = entry.getKey();
|  Long v = (Long) entry.getValue();
-   |  if (aDistinctAcc$i.add(k, v)) {
+   |  if (aDistinctAcc$i.add(
--- End diff --

The key in the entry is a `Row` already


> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463762#comment-16463762
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186036107
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase {
 (8000L, "8", "Hello World"),
 (2L, "20", "Hello World"))
 
+  @Test
+  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+// create a watermark with 10ms offset to delay the window emission by 
10ms to verify merge
+val sessionWindowTestdata = List(
+  (1L, 1, "Hello"),
--- End diff --

To check the correct merge behavior, we need two windows which aggregate 
the same value that is than deduplicated in merge.

Some data like:
```
  (1L, 2, "Hello"), // 1. Hello window
  (2L, 2, "Hello"), // 1. Hello window, deduped
  (8L, 2, "Hello"), // 2. Hello window, deduped during merge
  (10L, 3, "Hello"), // 2. Hello window, forwarded during merge
  (9L, 9, "Hello World"), // 1. Hello World window
  (4L, 1, "Hello"), // 1. Hello window, triggering merge of 1. and 2. 
Hello windows
  (16L, 16, "Hello")) // 3. Hello window (not merged)
```



> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186033706
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase {
 (8000L, "8", "Hello World"),
 (2L, "20", "Hello World"))
 
+  @Test
+  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+// create a watermark with 10ms offset to delay the window emission by 
10ms to verify merge
+val sessionWindowTestdata = List(
+  (1L, 1, "Hello"),
+  (2L, 2, "Hello"),
+  (8L, 8, "Hello"),
+  (9L, 9, "Hello World"),
+  (4L, 4, "Hello"),
+  (16L, 16, "Hello"))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
+StreamITCase.clear
+val stream = env
+  .fromCollection(sessionWindowTestdata)
+  .assignTimestampsAndWatermarks(new 
TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
+
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val table = stream.toTable(tEnv, 'long, 'int, 'string, 
'rowtime.rowtime)
+tEnv.registerTable("MyTable", table)
+
+val sqlQuery = "SELECT string, " +
+  "  COUNT(DISTINCT long) " +
--- End diff --

It would be good to add the end timestamp of the windows 
(`SESSION_END(rowtime, INTERVAL '0.005' SECOND)`) to make it easier to eyeball 
the expected test results.


---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186022377
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -641,7 +641,8 @@ class AggregationCodeGenerator(
|  java.util.Map.Entry entry = (java.util.Map.Entry) 
mergeIt$i.next();
|  Object k = entry.getKey();
--- End diff --

Change this line to `${classOf[Row].getCanonicalName} k = 
(${classOf[Row].getCanonicalName}) entry.getKey();`


---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186029985
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.{SessionGroupWindow, 
SlidingGroupWindow, TumblingGroupWindow}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.{Ignore, Test}
+
+class DistinctAggregateTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)](
+"MyTable",
+'a, 'b, 'c,
+'proctime.proctime, 'rowtime.rowtime)
+
+  @Test
+  def testDistinct(): Unit = {
+val sql = "SELECT DISTINCT a, b, c FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  streamTableNode(0),
+  term("select", "a, b, c")
+),
+term("groupBy", "a, b, c"),
+term("select", "a, b, c")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  // TODO: this query should be optimized to only have a single 
DataStreamGroupAggregate
+  // TODO: reopen this until FLINK-7144 fixed
+  @Ignore
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  streamTableNode(0),
+  term("select", "a")
+),
+term("groupBy", "a"),
+term("select", "a")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testDistinctAggregateOnTumbleWindow(): Unit = {
+val sqlQuery = "SELECT COUNT(DISTINCT a), " +
+  "  SUM(a) " +
+  "FROM MyTable " +
+  "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) "
+
+val expected = unaryNode(
+  "DataStreamGroupWindowAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "rowtime", "a")
+  ),
+  term("window", TumblingGroupWindow('w$, 'rowtime, 90.millis)),
+  term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(a) AS EXPR$1")
+)
+
+streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = {
+val sqlQuery = "SELECT COUNT(DISTINCT a), " +
+  "  SUM(DISTINCT a), " +
+  "  MAX(DISTINCT a) " +
+  "FROM MyTable " +
+  "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) "
+
+val expected = unaryNode(
+  "DataStreamGroupWindowAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "rowtime", "a")
+  ),
+  term("window", SlidingGroupWindow('w$, 'rowtime, 360.millis, 
90.millis)),
+  term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS 
EXPR$1",
+"MAX(DISTINCT a) AS EXPR$2")
+)
+
+streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = {
+val sqlQuery = "SELECT a, " +
+  "  COUNT(a), " +
+  "  SUM(DISTINCT c) " +
+  "FROM MyTable " +
+  "GROUP BY a, SESSION(rowtime, INTERVAL '15' MINUTE) "
+
+val expected = unaryNode(
+  "DataStreamGroupWindowAggregate",
+  

[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r185955967
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -641,7 +641,8 @@ class AggregationCodeGenerator(
|  java.util.Map.Entry entry = (java.util.Map.Entry) 
mergeIt$i.next();
|  Object k = entry.getKey();
|  Long v = (Long) entry.getValue();
-   |  if (aDistinctAcc$i.add(k, v)) {
+   |  if (aDistinctAcc$i.add(
--- End diff --

The key in the entry is a `Row` already


---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186036107
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase {
 (8000L, "8", "Hello World"),
 (2L, "20", "Hello World"))
 
+  @Test
+  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+// create a watermark with 10ms offset to delay the window emission by 
10ms to verify merge
+val sessionWindowTestdata = List(
+  (1L, 1, "Hello"),
--- End diff --

To check the correct merge behavior, we need two windows which aggregate 
the same value that is than deduplicated in merge.

Some data like:
```
  (1L, 2, "Hello"), // 1. Hello window
  (2L, 2, "Hello"), // 1. Hello window, deduped
  (8L, 2, "Hello"), // 2. Hello window, deduped during merge
  (10L, 3, "Hello"), // 2. Hello window, forwarded during merge
  (9L, 9, "Hello World"), // 1. Hello World window
  (4L, 1, "Hello"), // 1. Hello window, triggering merge of 1. and 2. 
Hello windows
  (16L, 16, "Hello")) // 3. Hello window (not merged)
```



---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186033401
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase {
 (8000L, "8", "Hello World"),
 (2L, "20", "Hello World"))
 
+  @Test
+  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+// create a watermark with 10ms offset to delay the window emission by 
10ms to verify merge
+val sessionWindowTestdata = List(
+  (1L, 1, "Hello"),
--- End diff --

The test is not checking for DISTINCT semantics since all aggregated values 
are distinct. We could do `COUNT(DISTINCT num)` (`int` has to be renamed to 
`num` because its a SQL keyword).


---


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463715#comment-16463715
 ] 

ASF GitHub Bot commented on FLINK-8946:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5869
  
cc @zentol @GJL 


> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5869: [FLINK-8946] TaskManager stop sending metrics after JobMa...

2018-05-04 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5869
  
cc @zentol @GJL 


---


[jira] [Created] (FLINK-9296) Support distinct aggregation on non-windowed grouped streaming tables

2018-05-04 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-9296:


 Summary: Support distinct aggregation on non-windowed grouped 
streaming tables
 Key: FLINK-9296
 URL: https://issues.apache.org/jira/browse/FLINK-9296
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Fabian Hueske






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9232) Add harness test for AggregationCodeGenerator

2018-05-04 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-9232:
-
Component/s: Table API & SQL

> Add harness test for AggregationCodeGenerator 
> --
>
> Key: FLINK-9232
> URL: https://issues.apache.org/jira/browse/FLINK-9232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Instead of relying on ITCase to cover the codegen result. We should have 
> direct test against that, for example using Harness test framework.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8691) Update table API to support distinct operator on data stream

2018-05-04 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-8691:
-
Component/s: Table API & SQL

> Update table API to support distinct operator on data stream
> 
>
> Key: FLINK-8691
> URL: https://issues.apache.org/jira/browse/FLINK-8691
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8739) Optimize runtime support for distinct filter

2018-05-04 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-8739:
-
Component/s: Table API & SQL

> Optimize runtime support for distinct filter
> 
>
> Key: FLINK-8739
> URL: https://issues.apache.org/jira/browse/FLINK-8739
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> Possible optimizaitons:
> 1. Decouple distinct map and actual accumulator so that they can separately 
> be created in codegen.
> 2. Reuse same distinct accumulator for filtering, e.g. `SELECT 
> COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-6335) Parse DISTINCT over grouped window in stream SQL

2018-05-04 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6335.

Resolution: Duplicate

Duplicate of FLINK-8690

> Parse DISTINCT over grouped window in stream SQL
> 
>
> Key: FLINK-6335
> URL: https://issues.apache.org/jira/browse/FLINK-6335
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Major
>
> The SQL on the batch side supports the {{DISTINCT}} keyword over aggregation. 
> This jira proposes to support the {{DISTINCT}} keyword on streaming 
> aggregation using the same technique on the batch side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-8690:
-
Summary: Support distinct aggregation on group windowed streaming tables.  
(was: Update logical rule set to generate FlinkLogicalAggregate explicitly 
allow distinct agg on DataStream)

> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-04 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-8690:
-
Component/s: Table API & SQL

> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-6373) Add runtime support for distinct aggregation over grouped windows

2018-05-04 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6373.

Resolution: Duplicate

Was implemented as FLINK-8689.

> Add runtime support for distinct aggregation over grouped windows
> -
>
> Key: FLINK-6373
> URL: https://issues.apache.org/jira/browse/FLINK-6373
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Major
>
> This is a follow up task for FLINK-6335. FLINK-6335 enables parsing the 
> distinct aggregations over grouped windows. This jira tracks the effort of 
> adding runtime support for the query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463642#comment-16463642
 ] 

ASF GitHub Bot commented on FLINK-7775:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5799
  
cc @zentol @GJL this PR takes a long time and the travis build error seems 
uncorrelated. can you review it? thanks.


> Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
> ---
>
> Key: FLINK-7775
> URL: https://issues.apache.org/jira/browse/FLINK-7775
> Project: Flink
>  Issue Type: Task
>  Components: Local Runtime
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>   public int getNumberOfCachedJobs() {
> return jobRefCounters.size();
>   }
> {code}
> The method of PermanentBlobCache is not used.
> We should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5799: [FLINK-7775] Remove unreferenced method PermanentBlobCach...

2018-05-04 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5799
  
cc @zentol @GJL this PR takes a long time and the travis build error seems 
uncorrelated. can you review it? thanks.


---


[GitHub] flink issue #5495: [FLINK-8659] Add migration itcases for broadcast state.

2018-05-04 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5495
  
I close this and I will open an updated one.


---


[GitHub] flink pull request #5495: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-04 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/5495


---


[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463628#comment-16463628
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/5495


> Add migration tests for Broadcast state.
> 
>
> Key: FLINK-8659
> URL: https://issues.apache.org/jira/browse/FLINK-8659
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-04 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5955

[FLINK-8659] Add migration itcases for broadcast state.

As the name implies, this PR add migration tests for the newly introduced 
broadcast state.

For the `scala` case, more refactoring is required so that the shared code 
between the tests is better distributed, but this is a broader refactoring. It 
requires the same work that was done for the previous case of the `java` 
migration tests.

R @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink migration-inv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5955.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5955


commit 9ae20e37b557e9ca482bd61cb57e8a6001a7eb6e
Author: kkloudas 
Date:   2018-05-03T08:05:13Z

[FLINK-8659] Add migration itcases for broadcast state.




---


[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463634#comment-16463634
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5955

[FLINK-8659] Add migration itcases for broadcast state.

As the name implies, this PR add migration tests for the newly introduced 
broadcast state.

For the `scala` case, more refactoring is required so that the shared code 
between the tests is better distributed, but this is a broader refactoring. It 
requires the same work that was done for the previous case of the `java` 
migration tests.

R @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink migration-inv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5955.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5955


commit 9ae20e37b557e9ca482bd61cb57e8a6001a7eb6e
Author: kkloudas 
Date:   2018-05-03T08:05:13Z

[FLINK-8659] Add migration itcases for broadcast state.




> Add migration tests for Broadcast state.
> 
>
> Key: FLINK-8659
> URL: https://issues.apache.org/jira/browse/FLINK-8659
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463630#comment-16463630
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5495
  
I close this and I will open an updated one.


> Add migration tests for Broadcast state.
> 
>
> Key: FLINK-8659
> URL: https://issues.apache.org/jira/browse/FLINK-8659
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9245) Can't create a BucketingSink with a provided Configuration if no hadoop defaults

2018-05-04 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-9245:
-

reopen to remove fixVersion

> Can't create a BucketingSink with a provided Configuration if no hadoop 
> defaults
> 
>
> Key: FLINK-9245
> URL: https://issues.apache.org/jira/browse/FLINK-9245
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.2
>Reporter: Julien Cuquemelle
>Priority: Minor
>
> We build Integration tests using this kind of code: 
> {code:java}
> val bucketingSink = new 
> BucketingSink[Row]("hdfs:///user/$USER)}/application_name/")
> bucketingSink.setFSConfig(hadoopRule.getConfiguration.hdfs)
> bucketingSink.setBucketer(new DateTimeBucketer[Row]("-MM-dd--HHmm"))
> outpuStream.addSink(bucketingSink)
> {code}
> Here, the hadoopRule is providing a valid hdfs config that should allows this 
> kind of code to run on a machine with no HADOOP_HOME or HADOOP_CONF_DIR set 
> up, like a developper workstation or a Jenkins slave.
> When running this code on such a machine, the .createHadoopFileSystem(...) 
> fails with 
> {noformat}
> The given file system URI (hdfs:///user/$USER/application_name/) did not 
> describe the authority 
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149)
> at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401){noformat}
> because it tries to instantiate the fileSystem from a default configuration 
> in .getUnguardedFileSystem() ; as the default conf doesn't exist, the default 
> filesystem resolves to "file:///" and the checks of the consistency of the 
> URI fails because no authority can be found So the whole filesystem creation 
> fails before actually trying to use the provided config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9245) Can't create a BucketingSink with a provided Configuration if no hadoop defaults

2018-05-04 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-9245.
---
Resolution: Not A Problem

> Can't create a BucketingSink with a provided Configuration if no hadoop 
> defaults
> 
>
> Key: FLINK-9245
> URL: https://issues.apache.org/jira/browse/FLINK-9245
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.2
>Reporter: Julien Cuquemelle
>Priority: Minor
>
> We build Integration tests using this kind of code: 
> {code:java}
> val bucketingSink = new 
> BucketingSink[Row]("hdfs:///user/$USER)}/application_name/")
> bucketingSink.setFSConfig(hadoopRule.getConfiguration.hdfs)
> bucketingSink.setBucketer(new DateTimeBucketer[Row]("-MM-dd--HHmm"))
> outpuStream.addSink(bucketingSink)
> {code}
> Here, the hadoopRule is providing a valid hdfs config that should allows this 
> kind of code to run on a machine with no HADOOP_HOME or HADOOP_CONF_DIR set 
> up, like a developper workstation or a Jenkins slave.
> When running this code on such a machine, the .createHadoopFileSystem(...) 
> fails with 
> {noformat}
> The given file system URI (hdfs:///user/$USER/application_name/) did not 
> describe the authority 
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149)
> at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401){noformat}
> because it tries to instantiate the fileSystem from a default configuration 
> in .getUnguardedFileSystem() ; as the default conf doesn't exist, the default 
> filesystem resolves to "file:///" and the checks of the consistency of the 
> URI fails because no authority can be found So the whole filesystem creation 
> fails before actually trying to use the provided config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9245) Can't create a BucketingSink with a provided Configuration if no hadoop defaults

2018-05-04 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-9245:

Fix Version/s: (was: 1.6.0)

> Can't create a BucketingSink with a provided Configuration if no hadoop 
> defaults
> 
>
> Key: FLINK-9245
> URL: https://issues.apache.org/jira/browse/FLINK-9245
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.2
>Reporter: Julien Cuquemelle
>Priority: Minor
>
> We build Integration tests using this kind of code: 
> {code:java}
> val bucketingSink = new 
> BucketingSink[Row]("hdfs:///user/$USER)}/application_name/")
> bucketingSink.setFSConfig(hadoopRule.getConfiguration.hdfs)
> bucketingSink.setBucketer(new DateTimeBucketer[Row]("-MM-dd--HHmm"))
> outpuStream.addSink(bucketingSink)
> {code}
> Here, the hadoopRule is providing a valid hdfs config that should allows this 
> kind of code to run on a machine with no HADOOP_HOME or HADOOP_CONF_DIR set 
> up, like a developper workstation or a Jenkins slave.
> When running this code on such a machine, the .createHadoopFileSystem(...) 
> fails with 
> {noformat}
> The given file system URI (hdfs:///user/$USER/application_name/) did not 
> describe the authority 
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149)
> at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401){noformat}
> because it tries to instantiate the fileSystem from a default configuration 
> in .getUnguardedFileSystem() ; as the default conf doesn't exist, the default 
> filesystem resolves to "file:///" and the checks of the consistency of the 
> URI fails because no authority can be found So the whole filesystem creation 
> fails before actually trying to use the provided config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9245) Can't create a BucketingSink with a provided Configuration if no hadoop defaults

2018-05-04 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-9245.
---
Resolution: Not A Problem

Closing now but please reopen if specifying an authority (host) doesn't fix it.

> Can't create a BucketingSink with a provided Configuration if no hadoop 
> defaults
> 
>
> Key: FLINK-9245
> URL: https://issues.apache.org/jira/browse/FLINK-9245
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.2
>Reporter: Julien Cuquemelle
>Priority: Minor
> Fix For: 1.6.0
>
>
> We build Integration tests using this kind of code: 
> {code:java}
> val bucketingSink = new 
> BucketingSink[Row]("hdfs:///user/$USER)}/application_name/")
> bucketingSink.setFSConfig(hadoopRule.getConfiguration.hdfs)
> bucketingSink.setBucketer(new DateTimeBucketer[Row]("-MM-dd--HHmm"))
> outpuStream.addSink(bucketingSink)
> {code}
> Here, the hadoopRule is providing a valid hdfs config that should allows this 
> kind of code to run on a machine with no HADOOP_HOME or HADOOP_CONF_DIR set 
> up, like a developper workstation or a Jenkins slave.
> When running this code on such a machine, the .createHadoopFileSystem(...) 
> fails with 
> {noformat}
> The given file system URI (hdfs:///user/$USER/application_name/) did not 
> describe the authority 
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149)
> at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401){noformat}
> because it tries to instantiate the fileSystem from a default configuration 
> in .getUnguardedFileSystem() ; as the default conf doesn't exist, the default 
> filesystem resolves to "file:///" and the checks of the consistency of the 
> URI fails because no authority can be found So the whole filesystem creation 
> fails before actually trying to use the provided config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463605#comment-16463605
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on the issue:

https://github.com/apache/flink/pull/5939
  
I'm closing this Pull Request and will make a new one with the newer 
approach as discussed in the jira issue


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463606#comment-16463606
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing closed the pull request at:

https://github.com/apache/flink/pull/5939


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5939: [FLINK-8500] [Kafka Connector] Get the timestamp o...

2018-05-04 Thread FredTing
Github user FredTing closed the pull request at:

https://github.com/apache/flink/pull/5939


---


[GitHub] flink issue #5939: [FLINK-8500] [Kafka Connector] Get the timestamp of the K...

2018-05-04 Thread FredTing
Github user FredTing commented on the issue:

https://github.com/apache/flink/pull/5939
  
I'm closing this Pull Request and will make a new one with the newer 
approach as discussed in the jira issue


---


[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463583#comment-16463583
 ] 

ASF GitHub Bot commented on FLINK-6206:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5399
  
@casidiablo yes that is correct.


> Log task state transitions as warn/error for FAILURE scenarios
> --
>
> Key: FLINK-6206
> URL: https://issues.apache.org/jira/browse/FLINK-6206
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Dan Bress
>Priority: Critical
>
> If a task fails due to an exception, I would like that to be logged at a warn 
> or an error level.  currently its info
> {code}
> private boolean transitionState(ExecutionState currentState, ExecutionState 
> newState, Throwable cause) {
>   if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
>   if (cause == null) {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState);
>   } else {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState, cause);
>   }
>   return true;
>   } else {
>   return false;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...

2018-05-04 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5399
  
@casidiablo yes that is correct.


---


[jira] [Commented] (FLINK-9276) Improve error message when TaskManager fails

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463579#comment-16463579
 ] 

ASF GitHub Bot commented on FLINK-9276:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5954

[FLINK-9276] Improve error message when TaskManager fails

## What is the purpose of the change

*This pull request improves error message when TaskManager fails*


## Brief change log

  - *add a Exception param from method `SlotPoolGateway#releaseTaskManager`*
  - *refactor the usage of method `SlotPoolGateway#releaseTaskManager`*

## Verifying this change


This change is already covered by existing tests, such as 
*SchedulerTestBase/SchedulerIsolatedTasks*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9276

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5954.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5954


commit 904f762e6b495ea6b4ffd445b9168f001f74be26
Author: yanghua 
Date:   2018-05-04T08:52:38Z

[FLINK-9276] Improve error message when TaskManager fails




> Improve error message when TaskManager fails
> 
>
> Key: FLINK-9276
> URL: https://issues.apache.org/jira/browse/FLINK-9276
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Critical
>
> When a TaskManager fails, we frequently get a message
> {code}
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> container_1524853016208_0001_01_000102
> {code}
> This message is misleading in that it sounds like an intended operation, when 
> it really is a failure of a container that the {{ResourceManager}} reports to 
> the {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...

2018-05-04 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5954

[FLINK-9276] Improve error message when TaskManager fails

## What is the purpose of the change

*This pull request improves error message when TaskManager fails*


## Brief change log

  - *add a Exception param from method `SlotPoolGateway#releaseTaskManager`*
  - *refactor the usage of method `SlotPoolGateway#releaseTaskManager`*

## Verifying this change


This change is already covered by existing tests, such as 
*SchedulerTestBase/SchedulerIsolatedTasks*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9276

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5954.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5954


commit 904f762e6b495ea6b4ffd445b9168f001f74be26
Author: yanghua 
Date:   2018-05-04T08:52:38Z

[FLINK-9276] Improve error message when TaskManager fails




---


[jira] [Closed] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-05-04 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-8620.
---
   Resolution: Fixed
Fix Version/s: 1.6.0

> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.6.0
>
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7151) FLINK SQL support create temporary function and table

2018-05-04 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen reassigned FLINK-7151:
-

Assignee: Shuyi Chen  (was: yuemeng)

> FLINK SQL support create temporary function and table
> -
>
> Key: FLINK-7151
> URL: https://issues.apache.org/jira/browse/FLINK-7151
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: yuemeng
>Assignee: Shuyi Chen
>Priority: Major
>
> Based on create temporary function and table.we can register a udf,udaf,udtf 
> use sql:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9169:
---
Priority: Blocker  (was: Major)

> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9292) Remove TypeInfoParser

2018-05-04 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463542#comment-16463542
 ] 

vinoyang commented on FLINK-9292:
-

[~aljoscha] and [~StephanEwen] can we remove the specific method's 
implementation and throw a specific exception?

> Remove TypeInfoParser
> -
>
> Key: FLINK-9292
> URL: https://issues.apache.org/jira/browse/FLINK-9292
> Project: Flink
>  Issue Type: Task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}.
> Because the TypeInfoParser is also not working correctly with respect to 
> classloading, we should remove it. Users still find the class, try to use it, 
> and run into problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9292) Remove TypeInfoParser

2018-05-04 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463534#comment-16463534
 ] 

vinoyang commented on FLINK-9292:
-

[~aljoscha] based on [~StephanEwen]'s description. the TypeInfoParser is not 
working correctly. so what's the way of handling?

> Remove TypeInfoParser
> -
>
> Key: FLINK-9292
> URL: https://issues.apache.org/jira/browse/FLINK-9292
> Project: Flink
>  Issue Type: Task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}.
> Because the TypeInfoParser is also not working correctly with respect to 
> classloading, we should remove it. Users still find the class, try to use it, 
> and run into problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async

2018-05-04 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-9269.
-
Resolution: Fixed

Merged in:

master: 14e7d35f26

release-1.5: 3ba21adc0e

> Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
> -
>
> Key: FLINK-9269
> URL: https://issues.apache.org/jira/browse/FLINK-9269
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0
>
>
> {code:java}
> @Test
> public void testConccurrencyProblem() throws Exception {
>   CheckpointStreamFactory streamFactory = createStreamFactory();
>   Environment env = new DummyEnvironment();
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE, env);
>   try {
>   long checkpointID = 0;
>   List futureList = new ArrayList();
>   for (int i = 0; i < 10; ++i) {
>   ValueStateDescriptor kvId = new 
> ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
>   ValueState state = 
> backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
>   ((InternalValueState) 
> state).setCurrentNamespace(VoidNamespace.INSTANCE);
>   backend.setCurrentKey(i);
>   state.update(i);
>   
> futureList.add(runSnapshotAsync(backend.snapshot(checkpointID++, 
> System.currentTimeMillis(), streamFactory, 
> CheckpointOptions.forCheckpointWithDefaultLocation(;
>   }
>   for (Future future : futureList) {
>   future.get();
>   }
>   } catch (Exception e) {
>   fail();
>   } finally {
>   backend.dispose();
>   }
> }
> protected Future runSnapshotAsync(
>   RunnableFuture 
> snapshotRunnableFuture) throws Exception {
>   if (!snapshotRunnableFuture.isDone()) {
>   return Executors.newFixedThreadPool(5).submit(() -> {
>   try {
>   snapshotRunnableFuture.run();
>   snapshotRunnableFuture.get();
>   } catch (Exception e) {
>   e.printStackTrace();
>   fail();
>   }
>   });
>   }
>   return null;
> }
> {code}
> Place the above code in `StateBackendTestBase` and run 
> `AsyncMemoryStateBackendTest`, it will get the follows exception
> {code}
> java.util.concurrent.ExecutionException: java.lang.NullPointerException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84)
>   ... 5 more
> java.util.concurrent.ExecutionException: java.lang.NullPointerException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> 

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463522#comment-16463522
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5580


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9292) Remove TypeInfoParser

2018-05-04 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463521#comment-16463521
 ] 

Aljoscha Krettek commented on FLINK-9292:
-

[~yanghua] Keep in mind, that we can't remove this now as it's marked as 
{{@Public}}.

> Remove TypeInfoParser
> -
>
> Key: FLINK-9292
> URL: https://issues.apache.org/jira/browse/FLINK-9292
> Project: Flink
>  Issue Type: Task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}.
> Because the TypeInfoParser is also not working correctly with respect to 
> classloading, we should remove it. Users still find the class, try to use it, 
> and run into problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-05-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5580


---


  1   2   >