[jira] [Commented] (FLINK-7289) Memory allocation of RocksDB can be problematic in container environments

2018-04-19 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445353#comment-16445353
 ] 

Sihua Zhou commented on FLINK-7289:
---

Hi [~srichter], I think it's quite hard to tell what is the best configuration 
for RocksDB, but there is one experience we had when using the RocksDBBackend, 
that is when we using RocksDBBackend we always set the 
{{taskmanager.memory.size}}, this parameter define the OFF-HEAP memory size 
that used by TM, it maybe a bit weird that we set this parameter for a stream 
job (because AFAIK currently only the batch job will use the OFF-HEAP memory to 
create direct memory buffer), the purpose that we set this parameter is we want 
to reserve this OFF-HEAP memory for RocksDB, even thought RocksDB doesn't 
regard it or controlled by it, but it split the total memory apart, one is HEAP 
memory used by JVM and controlled quite well with GC, other is OFF-HEAP that we 
reserve for RocksDB. This help us to be safer, but the parameter's 
name(taskmanager.memory.size) is quite weird regard of it's purpose in a stream 
job at a first glance. What do you think?

> Memory allocation of RocksDB can be problematic in container environments
> -
>
> Key: FLINK-7289
> URL: https://issues.apache.org/jira/browse/FLINK-7289
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Stefan Richter
>Priority: Major
>
> Flink's RocksDB based state backend allocates native memory. The amount of 
> allocated memory by RocksDB is not under the control of Flink or the JVM and 
> can (theoretically) grow without limits.
> In container environments, this can be problematic because the process can 
> exceed the memory budget of the container, and the process will get killed. 
> Currently, there is no other option than trusting RocksDB to be well behaved 
> and to follow its memory configurations. However, limiting RocksDB's memory 
> usage is not as easy as setting a single limit parameter. The memory limit is 
> determined by an interplay of several configuration parameters, which is 
> almost impossible to get right for users. Even worse, multiple RocksDB 
> instances can run inside the same process and make reasoning about the 
> configuration also dependent on the Flink job.
> Some information about the memory management in RocksDB can be found here:
> https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
> We should try to figure out ways to help users in one or more of the 
> following ways:
> - Some way to autotune or calculate the RocksDB configuration.
> - Conservative default values.
> - Additional documentation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445330#comment-16445330
 ] 

ASF GitHub Bot commented on FLINK-8836:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5880
  
+1, Will this PR also get into 1.4.x?


> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5880: [FLINK-8836] Fix duplicate method in KryoSerializer to pe...

2018-04-19 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5880
  
+1, Will this PR also get into 1.4.x?


---


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445315#comment-16445315
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5881

[FLINK-9190][yarn] fix YarnResourceManager sometimes does not request new 
Containers

## What is the purpose of the change

This PR fixes the problem that `YarnResourceManager` does not request new 
Containers when container were killed without registering with 
`ResourceManager`.

## Brief change log

  - *fix YarnResourceManager sometimes does not request new Containers*

## Verifying this change

  - *add unit test to 
`YarnResourceManagerTest#testKillContainerBeforeTMRegisterSuccessfully()` 
verify this*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (*yes*)
  - The S3 file system connector: (no)

## Documentation

no

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink 
fixYarnResourceManagerRequestContainers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5881.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5881


commit bbf03ca7fc709e11627560466bff01b9e750bbd2
Author: sihuazhou 
Date:   2018-04-20T05:02:28Z

fix YarnResourceManager sometimes does not request new Containers




> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5881: [FLINK-9190][yarn] fix YarnResourceManager sometim...

2018-04-19 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5881

[FLINK-9190][yarn] fix YarnResourceManager sometimes does not request new 
Containers

## What is the purpose of the change

This PR fixes the problem that `YarnResourceManager` does not request new 
Containers when container were killed without registering with 
`ResourceManager`.

## Brief change log

  - *fix YarnResourceManager sometimes does not request new Containers*

## Verifying this change

  - *add unit test to 
`YarnResourceManagerTest#testKillContainerBeforeTMRegisterSuccessfully()` 
verify this*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (*yes*)
  - The S3 file system connector: (no)

## Documentation

no

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink 
fixYarnResourceManagerRequestContainers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5881.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5881


commit bbf03ca7fc709e11627560466bff01b9e750bbd2
Author: sihuazhou 
Date:   2018-04-20T05:02:28Z

fix YarnResourceManager sometimes does not request new Containers




---


[jira] [Assigned] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-19 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9190:
-

Assignee: Sihua Zhou

> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445084#comment-16445084
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user dubin555 closed the pull request at:

https://github.com/apache/flink/pull/5202


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: DuBin
>Priority: Major
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9223) bufferConsumers should be closed in SpillableSubpartitionTest#testConsumeSpilledPartitionSpilledBeforeAdd

2018-04-19 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9223:
---

Assignee: vinoyang

> bufferConsumers should be closed in 
> SpillableSubpartitionTest#testConsumeSpilledPartitionSpilledBeforeAdd
> -
>
> Key: FLINK-9223
> URL: https://issues.apache.org/jira/browse/FLINK-9223
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
> BufferConsumer[] bufferConsumers = Arrays.stream(bufferBuilders).map(
>   BufferBuilder::createBufferConsumer
> ).toArray(BufferConsumer[]::new);
> {code}
> After operation on bufferConsumers is done, the BufferConsumer's in the array 
> should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2018-04-19 Thread dubin555
Github user dubin555 closed the pull request at:

https://github.com/apache/flink/pull/5202


---


[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

2018-04-19 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5845
  
Welcome to the community!
@sijie I think for this a general discussion email thread will be enough.


---


[jira] [Commented] (FLINK-9168) Pulsar Sink Connector

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444501#comment-16444501
 ] 

ASF GitHub Bot commented on FLINK-9168:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5845
  
Welcome to the community!
@sijie I think for this a general discussion email thread will be enough.


> Pulsar Sink Connector
> -
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Zongyang Xiao
>Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9210) Call getValue() only once in gauge serialization

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1658#comment-1658
 ] 

ASF GitHub Bot commented on FLINK-9210:
---

Github user Silven1117 commented on the issue:

https://github.com/apache/flink/pull/5875
  
Thanks that would be great! Will assign myself next time I create issues.


> Call getValue() only once in gauge serialization
> 
>
> Key: FLINK-9210
> URL: https://issues.apache.org/jira/browse/FLINK-9210
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Jisu You
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0, 1.4.3
>
>
> MetricDumpSerialization.serializeGauge() calls gauge.getValue twice in gauge 
> serialization. This is troublesome for those who reset gauges in getValue. 
> serializeGauge() should only call getValue once.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5875: [FLINK-9210] Removed unnecessary getValue calls in serial...

2018-04-19 Thread Silven1117
Github user Silven1117 commented on the issue:

https://github.com/apache/flink/pull/5875
  
Thanks that would be great! Will assign myself next time I create issues.


---


[jira] [Commented] (FLINK-9168) Pulsar Sink Connector

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1618#comment-1618
 ] 

ASF GitHub Bot commented on FLINK-9168:
---

Github user sijie commented on the issue:

https://github.com/apache/flink/pull/5845
  
@tzulitai thank you for your comments. Glad to hear your opinions about 
pulsar connectors. I was the original person who initiated the idea of flink 
pulsar connectors with @XiaoZYang, I am also from Pulsar IPMC. Although Pulsar 
is a young project, it is a very active developing project. We have committers 
from various companies and pretty good adoption. from pulsar community 
perspective, we are very happy committed to developing/maintaining pulsar 
connectors. hope this can help clear some of your concerns.

As the next step, I am happy to start the email thread at flink mailing 
list. should this be a FLIP? or just an general discussion email thread?


> Pulsar Sink Connector
> -
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Zongyang Xiao
>Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

2018-04-19 Thread sijie
Github user sijie commented on the issue:

https://github.com/apache/flink/pull/5845
  
@tzulitai thank you for your comments. Glad to hear your opinions about 
pulsar connectors. I was the original person who initiated the idea of flink 
pulsar connectors with @XiaoZYang, I am also from Pulsar IPMC. Although Pulsar 
is a young project, it is a very active developing project. We have committers 
from various companies and pretty good adoption. from pulsar community 
perspective, we are very happy committed to developing/maintaining pulsar 
connectors. hope this can help clear some of your concerns.

As the next step, I am happy to start the email thread at flink mailing 
list. should this be a FLIP? or just an general discussion email thread?


---


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444399#comment-16444399
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
Hi @fhueske . Thanks for the review, all very good points. I will follow up 
with the next steps. Actually @hequn8128  and I had some discussions regarding 
the follow up in FLINK-8690 already and I created 2 different approaches. 
Please kindly take a look when you have time :-)

Best,
Rong


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-04-19 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
Hi @fhueske . Thanks for the review, all very good points. I will follow up 
with the next steps. Actually @hequn8128  and I had some discussions regarding 
the follow up in FLINK-8690 already and I created 2 different approaches. 
Please kindly take a look when you have time :-)

Best,
Rong


---


[jira] [Created] (FLINK-9223) bufferConsumers should be closed in SpillableSubpartitionTest#testConsumeSpilledPartitionSpilledBeforeAdd

2018-04-19 Thread Ted Yu (JIRA)
Ted Yu created FLINK-9223:
-

 Summary: bufferConsumers should be closed in 
SpillableSubpartitionTest#testConsumeSpilledPartitionSpilledBeforeAdd
 Key: FLINK-9223
 URL: https://issues.apache.org/jira/browse/FLINK-9223
 Project: Flink
  Issue Type: Test
Reporter: Ted Yu


{code}
BufferConsumer[] bufferConsumers = Arrays.stream(bufferBuilders).map(
  BufferBuilder::createBufferConsumer
).toArray(BufferConsumer[]::new);
{code}
After operation on bufferConsumers is done, the BufferConsumer's in the array 
should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-2685) TaskManager deadlock on NetworkBufferPool

2018-04-19 Thread Amit Jain (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444342#comment-16444342
 ] 

Amit Jain edited comment on FLINK-2685 at 4/19/18 4:42 PM:
---

[~NicoK] I've checked in WebUI and found there is no progress at all.

This issue is coming up randomly, we have also observed that there are cases 
where few jobs hardly need to work with few MB of data and still hung up.

 
{noformat}
{
"nodes": [

{
"id": 5,
"type": "source",
"pact": "Data Source",
"contents": "at createInput(ExecutionEnvironment.java:553) 
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)",
"parallelism": "1",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" }],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "0.0" },
{ "name": "Cumulative Disk I/O", "value": "0.0" },
{ "name": "Cumulative CPU", "value": "0.0" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" }  ]
},
{
"id": 4,
"type": "pact",
"pact": "Map",
"contents": "Data Source Parquet 
s3a://limeroad-logs/emr-testing/ldp_test/mongo/mongo_userDB.loves_loves/main/1524148386302/",
"parallelism": "1",
"predecessors": [
{"id": 5, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
],
"driver_strategy": "Map",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" }],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "0.0" },
{ "name": "Cumulative Disk I/O", "value": "0.0" },
{ "name": "Cumulative CPU", "value": "0.0" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" }  ]
},
{
"id": 3,
"type": "pact",
"pact": "Map",
"contents": "Key Extractor",
"parallelism": "1",
"predecessors": [
{"id": 4, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
],
"driver_strategy": "Map",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" }],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "0.0" },
{ "name": "Cumulative Disk I/O", "value": "0.0" },
{ "name": "Cumulative CPU", "value": "0.0" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" }  ]
},
{
"id": 11,
"type": "source",
"pact": "Data Source",
"contents": "at createInput(ExecutionEnvironment.java:553) 
(org.apache.flink.api.java.io.TextInputFormat)",
"parallelism": "1",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" }],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": 

[jira] [Closed] (FLINK-9206) CheckpointCoordinator log messages do not contain the job ID

2018-04-19 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9206.
---
Resolution: Fixed

master: 727370aacf63cefc6aed7c46dc2d63517e4b708d
1.5: 71c2ac313315cb6b2b5d269041788b9907e07de1
1.4:  e3433d9300ad4ae35693ff76c2de987a37724c2f

> CheckpointCoordinator log messages do not contain the job ID
> 
>
> Key: FLINK-9206
> URL: https://issues.apache.org/jira/browse/FLINK-9206
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> The {{CheckpointCoordinator}} exists per job but several of its log messages 
> do not contain the job ID and thus if multiple jobs exist, we could not track 
> which log message belongs to which job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-2685) TaskManager deadlock on NetworkBufferPool

2018-04-19 Thread Amit Jain (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444342#comment-16444342
 ] 

Amit Jain commented on FLINK-2685:
--

[~NicoK] I've checked in WebUI and found there is no progress at all. 

This issue is coming up randomly, we have also observed that there are cases 
where few jobs hardly need to work with few MB of data and still hung up.
 

> TaskManager deadlock on NetworkBufferPool
> -
>
> Key: FLINK-2685
> URL: https://issues.apache.org/jira/browse/FLINK-2685
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 0.10.0
>Reporter: Greg Hogan
>Assignee: Ufuk Celebi
>Priority: Major
> Attachments: job_manager_19_feb_15_30_running, 
> task_manager_19_feb_15_30_running
>
>
> This deadlock occurs intermittently. I have a {{join}} followed by a 
> {{chain}} followed by a {{reduceGroup}}. Stack traces and local 
> variables from one each of the {{join}} threads below.
> The {{join}}'s are waiting on a buffer to become available 
> ({{networkBufferPool.availableMemorySegments.count=0}}). Both 
> {{LocalBufferPool}}'s have been given extra capacity ({{currentPoolSize=60 > 
> numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity 
> ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second 
> {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}).
> {{LocalBufferPool.returnExcessMemorySegments}} only recycles 
> {{MemorySegment}}'s from its {{availableMemorySegments}}, so any requested 
> {{Buffer}}'s will only be released when explicitly recycled.
> First join stack trace and variable values from 
> {{LocalBufferPool.requestBuffer}}:
> {noformat}
> owns: SpanningRecordSerializer  (id=723)   
> waiting for: ArrayDeque  (id=724)  
> Object.wait(long) line: not available [native method] 
> LocalBufferPool.requestBuffer(boolean) line: 163  
> LocalBufferPool.requestBufferBlocking() line: 133 
> RecordWriter.emit(T) line: 92  
> OutputCollector.collect(T) line: 65
> JoinOperator$ProjectFlatJoinFunction.join(T1, T2, Collector) 
> line: 1088   
> ReusingBuildSecondHashMatchIterator.callWithNextKey(FlatJoinFunction,
>  Collector) line: 137   
> JoinDriver.run() line: 208
> RegularPactTask.run() line: 489 
> RegularPactTask.invoke() line: 354  
> Task.run() line: 581  
> Thread.run() line: 745
> {noformat}
> {noformat}
> this  LocalBufferPool  (id=403)   
>   availableMemorySegments ArrayDeque  (id=398) 
>   elementsObject[16]  (id=422)
>   head14  
>   tail14  
>   currentPoolSize 60  
>   isDestroyed false   
>   networkBufferPool   NetworkBufferPool  (id=354) 
>   allBufferPools  HashSet  (id=424)
>   availableMemorySegments ArrayBlockingQueue  (id=427) 
>   count   0   
>   items   Object[10240]  (id=674) 
>   itrsnull
>   lockReentrantLock  (id=675) 
>   notEmpty
> AbstractQueuedSynchronizer$ConditionObject  (id=678)
>   notFull AbstractQueuedSynchronizer$ConditionObject  
> (id=679)
>   putIndex6954
>   takeIndex   6954
>   factoryLock Object  (id=430)
>   isDestroyed false   
>   managedBufferPools  HashSet  (id=431)
>   memorySegmentSize   32768   
>   numTotalRequiredBuffers 3226
>   totalNumberOfMemorySegments 10240   
>   numberOfRequestedMemorySegments 60  
>   numberOfRequiredMemorySegments  32  
>   owner   null
>   registeredListeners ArrayDeque  (id=421) 
>   elementsObject[16]  (id=685)
>   head0   
>   tail0   
> askToRecycle  false   
> isBlockingtrue
> {noformat}
> Second join stack trace and variable values from 
> {{SingleInputGate.getNextBufferOrEvent}}:
> {noformat}
> Unsafe.park(boolean, long) line: not available [native method]
> LockSupport.parkNanos(Object, long) line: 215 
> AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078
> LinkedBlockingQueue.poll(long, TimeUnit) line: 467 
> SingleInputGate.getNextBufferOrEvent() line: 414  
> MutableRecordReader(AbstractRecordReader).getNextRecord(T) line: 79 
> MutableRecordReader.next(T) line: 34   
> ReaderIterator.next(T) line: 59
> MutableHashTable$ProbeIterator.next() line: 1581  
> 

[jira] [Assigned] (FLINK-8715) RocksDB does not propagate reconfiguration of serializer to the states

2018-04-19 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-8715:
--

Assignee: Tzu-Li (Gordon) Tai

> RocksDB does not propagate reconfiguration of serializer to the states
> --
>
> Key: FLINK-8715
> URL: https://issues.apache.org/jira/browse/FLINK-8715
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Arvid Heise
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Any changes to the serializer done in #ensureCompability are lost during the 
> state creation.
> In particular, 
> [https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L68]
>  always uses a fresh copy of the StateDescriptor.
> An easy fix is to pass the reconfigured serializer as an additional parameter 
> in 
> [https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L1681]
>  , which can be retrieved through the side-output of getColumnFamily
> {code:java}
> kvStateInformation.get(stateDesc.getName()).f1.getStateSerializer()
> {code}
> I encountered it in 1.3.2 but the code in the master seems unchanged (hence 
> the pointer into master). I encountered it in ValueState, but I suspect the 
> same issue can be observed for all kinds of RocksDB states.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9207) Client returns SUCCESS(0) return code for canceled job

2018-04-19 Thread Amit Jain (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444306#comment-16444306
 ] 

Amit Jain commented on FLINK-9207:
--

[~aljoscha] I believe, many of us submit Batch job via some scheduler and CLI 
job executor expect the error in the mentioned case. 
I understand there exist other ways to achieve the same also. We are currently 
submitting our jobs in non-detached mode and found this problem. 

[~gjy] My intention was to make you guys aware of the issue. I would like to 
have it as Blocker as our current setup expect this. Please feel free to set it 
up according to the merit of the problem.

> Client returns SUCCESS(0) return code for canceled job
> --
>
> Key: FLINK-9207
> URL: https://issues.apache.org/jira/browse/FLINK-9207
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
> Environment: Version: 1.5.0, Commit : 2af481a
>Reporter: Amit Jain
>Priority: Minor
> Fix For: 1.5.0
>
>
> Flink Client returns zero return code when a job is deliberately canceled. 
> Steps to reproduced it:
> 1. bin/flink run -p 10 -m yarn-cluster -yjm 1024 -ytm 12288 WordCount.jar
> 2. User externally canceled the job.
> 3. Job Manager marked the job as CANCELED.
> 4. Although client code emits following logs, still returns zero return code.
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Killed 
> application application_1523726493647_.
> Job scheduler like Airflow would have hard-time detecting whether the 
> submitted job was canceled or not. 
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9189) Add a SBT Quickstart

2018-04-19 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9189:
---
Summary: Add a SBT Quickstart  (was: Add a SBT Quickstarts)

> Add a SBT Quickstart
> 
>
> Key: FLINK-9189
> URL: https://issues.apache.org/jira/browse/FLINK-9189
> Project: Flink
>  Issue Type: Improvement
>  Components: Quickstarts
>Reporter: Stephan Ewen
>Priority: Critical
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but I observed SBT users to 
> get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9222) Add a Gradle Quickstart

2018-04-19 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444291#comment-16444291
 ] 

Nico Kruber commented on FLINK-9222:


actually, what is wrong with the existing SBT quickstarts at 
https://ci.apache.org/projects/flink/flink-docs-master/quickstart/scala_api_quickstart.html#sbt
 ?

> Add a Gradle Quickstart
> ---
>
> Key: FLINK-9222
> URL: https://issues.apache.org/jira/browse/FLINK-9222
> Project: Flink
>  Issue Type: Improvement
>  Components: Quickstarts
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but there is none for Gradle 
> and Gradle users to get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-9222) Add a Gradle Quickstart

2018-04-19 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9222:
---
Comment: was deleted

(was: actually, what is wrong with the existing SBT quickstarts at 
https://ci.apache.org/projects/flink/flink-docs-master/quickstart/scala_api_quickstart.html#sbt
 ?)

> Add a Gradle Quickstart
> ---
>
> Key: FLINK-9222
> URL: https://issues.apache.org/jira/browse/FLINK-9222
> Project: Flink
>  Issue Type: Improvement
>  Components: Quickstarts
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but there is none for Gradle 
> and Gradle users to get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9189) Add a SBT Quickstarts

2018-04-19 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444292#comment-16444292
 ] 

Nico Kruber commented on FLINK-9189:


actually, what is wrong with the existing SBT quickstarts at 
https://ci.apache.org/projects/flink/flink-docs-master/quickstart/scala_api_quickstart.html#sbt
 ?

> Add a SBT Quickstarts
> -
>
> Key: FLINK-9189
> URL: https://issues.apache.org/jira/browse/FLINK-9189
> Project: Flink
>  Issue Type: Improvement
>  Components: Quickstarts
>Reporter: Stephan Ewen
>Priority: Critical
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but I observed SBT users to 
> get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9189) Add a SBT Quickstarts

2018-04-19 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9189:
---
Summary: Add a SBT Quickstarts  (was: Add a SBT and Gradle Quickstarts)

> Add a SBT Quickstarts
> -
>
> Key: FLINK-9189
> URL: https://issues.apache.org/jira/browse/FLINK-9189
> Project: Flink
>  Issue Type: Improvement
>  Components: Quickstarts
>Reporter: Stephan Ewen
>Priority: Critical
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but I observed SBT and Gradle 
> users to get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9189) Add a SBT Quickstarts

2018-04-19 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9189:
---
Description: 
Having a proper project template helps a lot in getting dependencies right. For 
example, setting the core dependencies to "provided", the connector / library 
dependencies to "compile", etc.

The Maven quickstarts are in good shape by now, but I observed SBT users to get 
this wrong quite often.

  was:
Having a proper project template helps a lot in getting dependencies right. For 
example, setting the core dependencies to "provided", the connector / library 
dependencies to "compile", etc.

The Maven quickstarts are in good shape by now, but I observed SBT and Gradle 
users to get this wrong quite often.


> Add a SBT Quickstarts
> -
>
> Key: FLINK-9189
> URL: https://issues.apache.org/jira/browse/FLINK-9189
> Project: Flink
>  Issue Type: Improvement
>  Components: Quickstarts
>Reporter: Stephan Ewen
>Priority: Critical
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but I observed SBT users to 
> get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9222) Add a Gradle Quickstart

2018-04-19 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9222:
---
Description: 
Having a proper project template helps a lot in getting dependencies right. For 
example, setting the core dependencies to "provided", the connector / library 
dependencies to "compile", etc.

The Maven quickstarts are in good shape by now, but there is none for Gradle 
and Gradle users to get this wrong quite often.

  was:
Having a proper project template helps a lot in getting dependencies right. For 
example, setting the core dependencies to "provided", the connector / library 
dependencies to "compile", etc.

The Maven quickstarts are in good shape by now, but I observed SBT and Gradle 
users to get this wrong quite often.


> Add a Gradle Quickstart
> ---
>
> Key: FLINK-9222
> URL: https://issues.apache.org/jira/browse/FLINK-9222
> Project: Flink
>  Issue Type: Improvement
>  Components: Quickstarts
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but there is none for Gradle 
> and Gradle users to get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9222) Add a Gradle Quickstart

2018-04-19 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-9222:
--

 Summary: Add a Gradle Quickstart
 Key: FLINK-9222
 URL: https://issues.apache.org/jira/browse/FLINK-9222
 Project: Flink
  Issue Type: Improvement
  Components: Quickstarts
Reporter: Nico Kruber


Having a proper project template helps a lot in getting dependencies right. For 
example, setting the core dependencies to "provided", the connector / library 
dependencies to "compile", etc.

The Maven quickstarts are in good shape by now, but I observed SBT and Gradle 
users to get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9222) Add a Gradle Quickstart

2018-04-19 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber reassigned FLINK-9222:
--

Assignee: Nico Kruber

> Add a Gradle Quickstart
> ---
>
> Key: FLINK-9222
> URL: https://issues.apache.org/jira/browse/FLINK-9222
> Project: Flink
>  Issue Type: Improvement
>  Components: Quickstarts
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but I observed SBT and Gradle 
> users to get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9202) AvroSerializer should not be serializing the target Avro type class

2018-04-19 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444261#comment-16444261
 ] 

Timo Walther commented on FLINK-9202:
-

I could reproduce the exception {{"local class incompatible: stream classdesc 
serialVersionUID = -5332488931363852176, local class serialVersionUID = 
-8084632352057382365"}}. Will work on fix.

> AvroSerializer should not be serializing the target Avro type class
> ---
>
> Key: FLINK-9202
> URL: https://issues.apache.org/jira/browse/FLINK-9202
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Timo Walther
>Priority: Critical
>
> The {{AvroSerializer}} contains this field which is written when the 
> serializer is written into savepoints:
> [https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L78]
> This causes Avro schema evolution to not work properly, because Avro 
> generated classes have non-fixed serialVersionUIDs. Once a new Avro class is 
> generated with a new schema, that class can not be loaded on restore due to 
> incompatible UIDs, and thus the serializer can not be successfully 
> deserialized.
> A possible solution would be to only write the classname, and dynamically 
> load the class into a transient field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]

2018-04-19 Thread Josh Lemer (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Lemer updated FLINK-9221:
--
Component/s: DataSet API

> Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
> ---
>
> Key: FLINK-9221
> URL: https://issues.apache.org/jira/browse/FLINK-9221
> Project: Flink
>  Issue Type: Task
>  Components: DataSet API, DataStream API
>Affects Versions: 1.5.0
>Reporter: Josh Lemer
>Priority: Minor
>  Labels: flink
>
> Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
> Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
> have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
> `SinkFunctions`, so that you can reuse existing complex sink functions, but 
> with a different input type. For example:
> {code}
> val bucketingStringSink: SinkFunction[String] = 
>   new BucketingSink[String]("...")
> .setBucketer(new DateTimeBucketer("-MM-dd-HHmm")
> val bucketingIntListSink: SinkFunction[List[Int]] =
>   bucketingStringSink.contramap[List[Int]](_.mkString(","))
> {code}
> For some more formal motivation behind this, 
> https://typelevel.org/cats/typeclasses/contravariant.html is definitely a 
> great place to start!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]

2018-04-19 Thread Josh Lemer (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444250#comment-16444250
 ] 

Josh Lemer commented on FLINK-9221:
---

Fixed!

> Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
> ---
>
> Key: FLINK-9221
> URL: https://issues.apache.org/jira/browse/FLINK-9221
> Project: Flink
>  Issue Type: Task
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Josh Lemer
>Priority: Minor
>  Labels: flink
>
> Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
> Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
> have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
> `SinkFunctions`, so that you can reuse existing complex sink functions, but 
> with a different input type. For example:
> {code}
> val bucketingStringSink: SinkFunction[String] = 
>   new BucketingSink[String]("...")
> .setBucketer(new DateTimeBucketer("-MM-dd-HHmm")
> val bucketingIntListSink: SinkFunction[List[Int]] =
>   bucketingStringSink.contramap[List[Int]](_.mkString(","))
> {code}
> For some more formal motivation behind this, 
> https://typelevel.org/cats/typeclasses/contravariant.html is definitely a 
> great place to start!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]

2018-04-19 Thread Josh Lemer (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Lemer updated FLINK-9221:
--
Labels: flink  (was: )

> Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
> ---
>
> Key: FLINK-9221
> URL: https://issues.apache.org/jira/browse/FLINK-9221
> Project: Flink
>  Issue Type: Task
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Josh Lemer
>Priority: Minor
>  Labels: flink
>
> Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
> Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
> have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
> `SinkFunctions`, so that you can reuse existing complex sink functions, but 
> with a different input type. For example:
> {code}
> val bucketingStringSink: SinkFunction[String] = 
>   new BucketingSink[String]("...")
> .setBucketer(new DateTimeBucketer("-MM-dd-HHmm")
> val bucketingIntListSink: SinkFunction[List[Int]] =
>   bucketingStringSink.contramap[List[Int]](_.mkString(","))
> {code}
> For some more formal motivation behind this, 
> https://typelevel.org/cats/typeclasses/contravariant.html is definitely a 
> great place to start!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]

2018-04-19 Thread Josh Lemer (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Lemer updated FLINK-9221:
--
Affects Version/s: 1.5.0

> Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
> ---
>
> Key: FLINK-9221
> URL: https://issues.apache.org/jira/browse/FLINK-9221
> Project: Flink
>  Issue Type: Task
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Josh Lemer
>Priority: Minor
>  Labels: flink
>
> Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
> Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
> have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
> `SinkFunctions`, so that you can reuse existing complex sink functions, but 
> with a different input type. For example:
> {code}
> val bucketingStringSink: SinkFunction[String] = 
>   new BucketingSink[String]("...")
> .setBucketer(new DateTimeBucketer("-MM-dd-HHmm")
> val bucketingIntListSink: SinkFunction[List[Int]] =
>   bucketingStringSink.contramap[List[Int]](_.mkString(","))
> {code}
> For some more formal motivation behind this, 
> https://typelevel.org/cats/typeclasses/contravariant.html is definitely a 
> great place to start!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9207) Client returns SUCCESS(0) return code for canceled job

2018-04-19 Thread Gary Yao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444252#comment-16444252
 ] 

Gary Yao commented on FLINK-9207:
-

[~amit.jain] Is the priority only _minor_ for you?

> Client returns SUCCESS(0) return code for canceled job
> --
>
> Key: FLINK-9207
> URL: https://issues.apache.org/jira/browse/FLINK-9207
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
> Environment: Version: 1.5.0, Commit : 2af481a
>Reporter: Amit Jain
>Priority: Minor
> Fix For: 1.5.0
>
>
> Flink Client returns zero return code when a job is deliberately canceled. 
> Steps to reproduced it:
> 1. bin/flink run -p 10 -m yarn-cluster -yjm 1024 -ytm 12288 WordCount.jar
> 2. User externally canceled the job.
> 3. Job Manager marked the job as CANCELED.
> 4. Although client code emits following logs, still returns zero return code.
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Killed 
> application application_1523726493647_.
> Job scheduler like Airflow would have hard-time detecting whether the 
> submitted job was canceled or not. 
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]

2018-04-19 Thread Josh Lemer (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Lemer updated FLINK-9221:
--
Component/s: DataStream API

> Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
> ---
>
> Key: FLINK-9221
> URL: https://issues.apache.org/jira/browse/FLINK-9221
> Project: Flink
>  Issue Type: Task
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Josh Lemer
>Priority: Minor
>  Labels: flink
>
> Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
> Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
> have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
> `SinkFunctions`, so that you can reuse existing complex sink functions, but 
> with a different input type. For example:
> {code}
> val bucketingStringSink: SinkFunction[String] = 
>   new BucketingSink[String]("...")
> .setBucketer(new DateTimeBucketer("-MM-dd-HHmm")
> val bucketingIntListSink: SinkFunction[List[Int]] =
>   bucketingStringSink.contramap[List[Int]](_.mkString(","))
> {code}
> For some more formal motivation behind this, 
> https://typelevel.org/cats/typeclasses/contravariant.html is definitely a 
> great place to start!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]

2018-04-19 Thread Josh Lemer (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Lemer updated FLINK-9221:
--
Description: 
Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
`SinkFunctions`, so that you can reuse existing complex sink functions, but 
with a different input type. For example:
{code}
val bucketingStringSink: SinkFunction[String] = 
  new BucketingSink[String]("...")
.setBucketer(new DateTimeBucketer("-MM-dd-HHmm")

val bucketingIntListSink: SinkFunction[List[Int]] =
  bucketingStringSink.contramap[List[Int]](_.mkString(","))
{code}

For some more formal motivation behind this, 
https://typelevel.org/cats/typeclasses/contravariant.html is definitely a great 
place to start!

  was:
Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
`SinkFunctions`, so that you can reuse existing complex sink functions, but 
with a different input type. For example:
{code}
val bucketingStringSink: SinkFunction[String] = 
  new BucketingSink[String]("...")
.setBucketr(new DateTimeBucketer("-MM-dd-HHmm")

val bucketingIntListSink: SinkFunction[List[Int]] =
  bucketingStringSink.contramap[List[Int]](_.mkString(","))
{code}

For some more formal motivation behind this, 
https://typelevel.org/cats/typeclasses/contravariant.html is definitely a great 
place to start!


> Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
> ---
>
> Key: FLINK-9221
> URL: https://issues.apache.org/jira/browse/FLINK-9221
> Project: Flink
>  Issue Type: Task
>Reporter: Josh Lemer
>Priority: Minor
>
> Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
> Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
> have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
> `SinkFunctions`, so that you can reuse existing complex sink functions, but 
> with a different input type. For example:
> {code}
> val bucketingStringSink: SinkFunction[String] = 
>   new BucketingSink[String]("...")
> .setBucketer(new DateTimeBucketer("-MM-dd-HHmm")
> val bucketingIntListSink: SinkFunction[List[Int]] =
>   bucketingStringSink.contramap[List[Int]](_.mkString(","))
> {code}
> For some more formal motivation behind this, 
> https://typelevel.org/cats/typeclasses/contravariant.html is definitely a 
> great place to start!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9211) Job submission via REST/dashboard does not work on Kubernetes

2018-04-19 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-9211:
---

Assignee: Aljoscha Krettek

> Job submission via REST/dashboard does not work on Kubernetes
> -
>
> Key: FLINK-9211
> URL: https://issues.apache.org/jira/browse/FLINK-9211
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Web Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When setting up a cluster on Kubernets according to the documentation it is 
> possible to upload jar files but when trying to execute them you get an 
> exception like this:
> {code}
> org.apache.flink.runtime.rest.handler.RestHandlerException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$2(JarRunHandler.java:113)
> at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:196)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:356)
> ... 17 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
> ... 18 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: 
> connection timed out: flink-jobmanager/10.105.154.28:8081
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> ... 15 more
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: 
> connection timed out: flink-jobmanager/10.105.154.28:8081
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
> ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]

2018-04-19 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444236#comment-16444236
 ] 

Ted Yu commented on FLINK-9221:
---

bq. .setBucketr

Should be {{.setBucketer}}

> Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
> ---
>
> Key: FLINK-9221
> URL: https://issues.apache.org/jira/browse/FLINK-9221
> Project: Flink
>  Issue Type: Task
>Reporter: Josh Lemer
>Priority: Minor
>
> Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
> Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
> have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
> `SinkFunctions`, so that you can reuse existing complex sink functions, but 
> with a different input type. For example:
> {code}
> val bucketingStringSink: SinkFunction[String] = 
>   new BucketingSink[String]("...")
> .setBucketr(new DateTimeBucketer("-MM-dd-HHmm")
> val bucketingIntListSink: SinkFunction[List[Int]] =
>   bucketingStringSink.contramap[List[Int]](_.mkString(","))
> {code}
> For some more formal motivation behind this, 
> https://typelevel.org/cats/typeclasses/contravariant.html is definitely a 
> great place to start!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444215#comment-16444215
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182219690
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +51,96 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
--- End diff --

I tested a few other queries with distinct aggregates. 
* Queries with non-windowed `DISTINCT` aggregations work, but they are they 
are translated without distinct aggregations. So they changes in this PR are 
not used.
* Queries with `DISTINCT` aggregates and `TUMBLE` or `HOP` windows fail 
with strange exceptions. Did not look related to these changes, but would be 
good to check.

We don't have to fix these things in this PR (unless it is broken by these 
changes).

In general, I think it would be good to add unit tests for the 
`AggregationCodeGenerator`. We could generate  `GeneratedAggregations` for 
different configurations, compile them, and check if the result is expected. 
Actually, we should have done that already. This should also work for 
state-backend backed views if we embed the test in a HarnessTest.


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444231#comment-16444231
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182479163
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -93,6 +97,8 @@ class AggregationCodeGenerator(
   aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
   aggFields: Array[Array[Int]],
   aggMapping: Array[Int],
+  distinctAggs: Array[Seq[DataViewSpec[_]]],
+  isStateBackedDataViews: Boolean,
--- End diff --

We should add a constructor check for `if (partialResults && 
isStateBackedDataViews)` and throw an exception if `true`.  `partialResults` 
means that the `Row` with the accumulators has to be emitted which won't work 
well for state-backed distinct maps that are probably too big.


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444221#comment-16444221
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182478446
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -93,6 +97,8 @@ class AggregationCodeGenerator(
   aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
   aggFields: Array[Array[Int]],
   aggMapping: Array[Int],
+  distinctAggs: Array[Seq[DataViewSpec[_]]],
--- End diff --

I would make this an `Array[Boolean]` and rename to `isDistinctAgg`. We can 
build the `MapViewSpec`s in the method. We have all the information for that in 
the other parameters.



> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444230#comment-16444230
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182773359
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -417,13 +530,26 @@ class AggregationCodeGenerator(
 .stripMargin
   val create: String = {
 for (i <- aggs.indices) yield {
-  j"""
- |${accTypes(i)} acc$i = (${accTypes(i)}) 
${aggs(i)}.createAccumulator();
- |${genDataViewFieldSetter(s"acc$i", i)}
- |accs.setField(
- |  $i,
- |  acc$i);"""
-.stripMargin
+  if (isDistinctAggs(i)) {
+j"""
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
${aggs(i)}.createAccumulator();
+   |$distinctAccType distinctAcc$i = ($distinctAccType) 
new org.apache.flink.table.
+   |functions.aggfunctions.DistinctAccumulator(acc$i);
+   |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)}
--- End diff --

I think this and the `genAccDataViewFieldSetter` call (both from here and 
the non-distinct case) can be removed.


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444223#comment-16444223
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182496636
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
+
+  distinctAggs(index) = Seq(
--- End diff --

I would generate the `MapViewSpec` in the aggregation code generator


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]

2018-04-19 Thread Josh Lemer (JIRA)
Josh Lemer created FLINK-9221:
-

 Summary: Add method SinkFunction[A]#contramap[B](f: B => A): 
SinkFunction[B]
 Key: FLINK-9221
 URL: https://issues.apache.org/jira/browse/FLINK-9221
 Project: Flink
  Issue Type: Task
Reporter: Josh Lemer


Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
`SinkFunctions`, so that you can reuse existing complex sink functions, but 
with a different input type. For example:
{code}
val bucketingStringSink: SinkFunction[String] = 
  new BucketingSink[String]("...")
.setBucketr(new DateTimeBucketer("-MM-dd-HHmm")

val bucketingIntListSink: SinkFunction[List[Int]] =
  bucketingStringSink.contramap[List[Int]](_.mkString(","))
{code}

For some more formal motivation behind this, 
https://typelevel.org/cats/typeclasses/contravariant.html is definitely a great 
place to start!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444216#comment-16444216
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182226785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
--- End diff --

Reword error message to `"DISTINCT aggregations with multiple parameters 
not fully supported yet."`.


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444218#comment-16444218
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182254735
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
--- End diff --

We need to forward the distinct maps as well. `partialResults` is used when 
an operator needs to emit partial aggregation results such as a combine 
function in batch execution. So we don't need to distinguish the 
`isDistinctAggs(i)` case here.


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444222#comment-16444222
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182233134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
--- End diff --

add more comments for the `aggCall.isDistinct` branch


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444224#comment-16444224
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182753219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

If we change the input parameter, we have the `Array[Boolean]` already


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444229#comment-16444229
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182768726
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
+} else {
+  j"""
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+}
   } else {
-j"""
-   |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
-   |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
-   |${genDataViewFieldSetter(s"acc$i", i)}
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  baseClass$i.getValue(acc$i));""".stripMargin
+if (isDistinctAggs(i)) {
+  j"""
+ |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
+ |  
(org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |${genDistinctDataViewFieldSetter(s"distinctAcc$i", 
i)}
+ |${accTypes(i)} acc$i = (${accTypes(i)}) 
distinctAcc$i.getRealAcc();
+ |${genAccDataViewFieldSetter(s"acc$i", i)}
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  baseClass$i.getValue(acc$i));""".stripMargin
+} else {
--- End diff --

both cases share a lot of code. We could only retrieve `acc$i` differently.
Does that make sense or fragment the code too much?
Same would apply for `accumulate()` and `retract()`.


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444225#comment-16444225
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182496465
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
--- End diff --

Does the approach also work for `null` values in both MapViews? If not, we 
can use a `Row(1)` that serializes a bitmask for `null` values.


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444226#comment-16444226
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182489141
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
--- End diff --

+1


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444219#comment-16444219
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182469110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
 ---
@@ -70,7 +70,12 @@ trait OverAggregate {
 
 val aggStrings = namedAggregates.map(_.getKey).map(
   a => s"${a.getAggregation}(${
-if (a.getArgList.size() > 0) {
+val prefix = if (a.isDistinct) {
--- End diff --

In case we also want to support group-windowed DISTINCT aggregates, we 
would need to change `CommonAggregate.aggregationToString()` as well. 


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444228#comment-16444228
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182768521
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
+} else {
+  j"""
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+}
   } else {
-j"""
-   |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
-   |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
-   |${genDataViewFieldSetter(s"acc$i", i)}
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  baseClass$i.getValue(acc$i));""".stripMargin
+if (isDistinctAggs(i)) {
+  j"""
+ |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
+ |  
(org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |${genDistinctDataViewFieldSetter(s"distinctAcc$i", 
i)}
--- End diff --

we don't need this statement


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444220#comment-16444220
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182481625
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, Integer]) {
+  def this() {
+this(null.asInstanceOf[ACC], new MapView[E, Integer]())
+  }
+
+  def this(realAcc: ACC) {
+this(realAcc, new MapView[E, Integer]())
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+  def add(element: E): Boolean = {
+if (element != null) {
+  if (mapView.contains(element)) {
--- End diff --

+1


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444217#comment-16444217
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182218521
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
--- End diff --

I would not create the `MapViewSpec`s here but do that in the code gen 
function. Here we should create an `Array[Boolean]` to flag distinct 
aggregations.


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444227#comment-16444227
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182753514
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

Instead we can create the `DataViewSpecs` here.


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182496636
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
+
+  distinctAggs(index) = Seq(
--- End diff --

I would generate the `MapViewSpec` in the aggregation code generator


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182226785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
--- End diff --

Reword error message to `"DISTINCT aggregations with multiple parameters 
not fully supported yet."`.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182753514
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

Instead we can create the `DataViewSpecs` here.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182478446
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -93,6 +97,8 @@ class AggregationCodeGenerator(
   aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
   aggFields: Array[Array[Int]],
   aggMapping: Array[Int],
+  distinctAggs: Array[Seq[DataViewSpec[_]]],
--- End diff --

I would make this an `Array[Boolean]` and rename to `isDistinctAgg`. We can 
build the `MapViewSpec`s in the method. We have all the information for that in 
the other parameters.



---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182481625
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, Integer]) {
+  def this() {
+this(null.asInstanceOf[ACC], new MapView[E, Integer]())
+  }
+
+  def this(realAcc: ACC) {
+this(realAcc, new MapView[E, Integer]())
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+  def add(element: E): Boolean = {
+if (element != null) {
+  if (mapView.contains(element)) {
--- End diff --

+1


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182479163
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -93,6 +97,8 @@ class AggregationCodeGenerator(
   aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
   aggFields: Array[Array[Int]],
   aggMapping: Array[Int],
+  distinctAggs: Array[Seq[DataViewSpec[_]]],
+  isStateBackedDataViews: Boolean,
--- End diff --

We should add a constructor check for `if (partialResults && 
isStateBackedDataViews)` and throw an exception if `true`.  `partialResults` 
means that the `Row` with the accumulators has to be emitted which won't work 
well for state-backed distinct maps that are probably too big.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182768521
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
+} else {
+  j"""
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+}
   } else {
-j"""
-   |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
-   |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
-   |${genDataViewFieldSetter(s"acc$i", i)}
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  baseClass$i.getValue(acc$i));""".stripMargin
+if (isDistinctAggs(i)) {
+  j"""
+ |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
+ |  
(org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |${genDistinctDataViewFieldSetter(s"distinctAcc$i", 
i)}
--- End diff --

we don't need this statement


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182218521
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
--- End diff --

I would not create the `MapViewSpec`s here but do that in the code gen 
function. Here we should create an `Array[Boolean]` to flag distinct 
aggregations.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182496465
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
--- End diff --

Does the approach also work for `null` values in both MapViews? If not, we 
can use a `Row(1)` that serializes a bitmask for `null` values.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182773359
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -417,13 +530,26 @@ class AggregationCodeGenerator(
 .stripMargin
   val create: String = {
 for (i <- aggs.indices) yield {
-  j"""
- |${accTypes(i)} acc$i = (${accTypes(i)}) 
${aggs(i)}.createAccumulator();
- |${genDataViewFieldSetter(s"acc$i", i)}
- |accs.setField(
- |  $i,
- |  acc$i);"""
-.stripMargin
+  if (isDistinctAggs(i)) {
+j"""
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
${aggs(i)}.createAccumulator();
+   |$distinctAccType distinctAcc$i = ($distinctAccType) 
new org.apache.flink.table.
+   |functions.aggfunctions.DistinctAccumulator(acc$i);
+   |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)}
--- End diff --

I think this and the `genAccDataViewFieldSetter` call (both from here and 
the non-distinct case) can be removed.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182219690
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +51,96 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
--- End diff --

I tested a few other queries with distinct aggregates. 
* Queries with non-windowed `DISTINCT` aggregations work, but they are they 
are translated without distinct aggregations. So they changes in this PR are 
not used.
* Queries with `DISTINCT` aggregates and `TUMBLE` or `HOP` windows fail 
with strange exceptions. Did not look related to these changes, but would be 
good to check.

We don't have to fix these things in this PR (unless it is broken by these 
changes).

In general, I think it would be good to add unit tests for the 
`AggregationCodeGenerator`. We could generate  `GeneratedAggregations` for 
different configurations, compile them, and check if the result is expected. 
Actually, we should have done that already. This should also work for 
state-backend backed views if we embed the test in a HarnessTest.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182489141
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
--- End diff --

+1


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182233134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
--- End diff --

add more comments for the `aggCall.isDistinct` branch


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182753219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

If we change the input parameter, we have the `Array[Boolean]` already


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182254735
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
--- End diff --

We need to forward the distinct maps as well. `partialResults` is used when 
an operator needs to emit partial aggregation results such as a combine 
function in batch execution. So we don't need to distinguish the 
`isDistinctAggs(i)` case here.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182768726
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
+} else {
+  j"""
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+}
   } else {
-j"""
-   |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
-   |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
-   |${genDataViewFieldSetter(s"acc$i", i)}
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  baseClass$i.getValue(acc$i));""".stripMargin
+if (isDistinctAggs(i)) {
+  j"""
+ |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
+ |  
(org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |${genDistinctDataViewFieldSetter(s"distinctAcc$i", 
i)}
+ |${accTypes(i)} acc$i = (${accTypes(i)}) 
distinctAcc$i.getRealAcc();
+ |${genAccDataViewFieldSetter(s"acc$i", i)}
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  baseClass$i.getValue(acc$i));""".stripMargin
+} else {
--- End diff --

both cases share a lot of code. We could only retrieve `acc$i` differently.
Does that make sense or fragment the code too much?
Same would apply for `accumulate()` and `retract()`.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r182469110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
 ---
@@ -70,7 +70,12 @@ trait OverAggregate {
 
 val aggStrings = namedAggregates.map(_.getKey).map(
   a => s"${a.getAggregation}(${
-if (a.getArgList.size() > 0) {
+val prefix = if (a.isDistinct) {
--- End diff --

In case we also want to support group-windowed DISTINCT aggregates, we 
would need to change `CommonAggregate.aggregationToString()` as well. 


---


[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444174#comment-16444174
 ] 

ASF GitHub Bot commented on FLINK-8836:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5880#discussion_r182777472
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
 ---
@@ -24,21 +24,71 @@
 import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.core.testutils.CheckedThread;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 import static org.junit.Assert.fail;
 
 /**
  * This tests that the {@link KryoSerializer} properly fails when accessed 
by two threads
- * concurrently.
+ * concurrently and that Kryo serializers are properly duplicated to use 
them in different threads.
  *
  * Important: This test only works if assertions are activated 
(-ea) on the JVM
  * when running tests.
  */
 public class KryoSerializerConcurrencyTest {
 
+   @Test
+   public void testDuplicateSerializerWithDefaultSerializerClass() {
--- End diff --

test names are mixed up, this and the next one should be switched


> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...

2018-04-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5880#discussion_r182777472
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
 ---
@@ -24,21 +24,71 @@
 import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.core.testutils.CheckedThread;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 import static org.junit.Assert.fail;
 
 /**
  * This tests that the {@link KryoSerializer} properly fails when accessed 
by two threads
- * concurrently.
+ * concurrently and that Kryo serializers are properly duplicated to use 
them in different threads.
  *
  * Important: This test only works if assertions are activated 
(-ea) on the JVM
  * when running tests.
  */
 public class KryoSerializerConcurrencyTest {
 
+   @Test
+   public void testDuplicateSerializerWithDefaultSerializerClass() {
--- End diff --

test names are mixed up, this and the next one should be switched


---


[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444169#comment-16444169
 ] 

ASF GitHub Bot commented on FLINK-8836:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5880#discussion_r182776735
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
@@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig 
executionConfig){
 * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
 */
protected KryoSerializer(KryoSerializer toCopy) {
-   defaultSerializers = toCopy.defaultSerializers;
-   defaultSerializerClasses = toCopy.defaultSerializerClasses;
 
-   kryoRegistrations = toCopy.kryoRegistrations;
+   this.type = checkNotNull(toCopy.type, "Type class cannot be 
null.");
+   this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+   this.defaultSerializers = new 
LinkedHashMap<>(toCopy.defaultSerializers.size());
+   this.kryoRegistrations = new 
LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+   // deep copy the serializer instances in defaultSerializers
+   for (Map.Entry entry :
+   toCopy.defaultSerializers.entrySet()) {
 
-   type = toCopy.type;
-   if(type == null){
-   throw new NullPointerException("Type class cannot be 
null.");
+   this.defaultSerializers.put(entry.getKey(), 
deepCopySerializer(entry.getValue()));
+   }
+
+   // deep copy the serializer instances in kryoRegistrations
+   for (Map.Entry entry : 
toCopy.kryoRegistrations.entrySet()) {
--- End diff --

The problem is that we don't have the `ExecutionConfig` in the copy 
constructor.


> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...

2018-04-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5880#discussion_r182776735
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
@@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig 
executionConfig){
 * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
 */
protected KryoSerializer(KryoSerializer toCopy) {
-   defaultSerializers = toCopy.defaultSerializers;
-   defaultSerializerClasses = toCopy.defaultSerializerClasses;
 
-   kryoRegistrations = toCopy.kryoRegistrations;
+   this.type = checkNotNull(toCopy.type, "Type class cannot be 
null.");
+   this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+   this.defaultSerializers = new 
LinkedHashMap<>(toCopy.defaultSerializers.size());
+   this.kryoRegistrations = new 
LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+   // deep copy the serializer instances in defaultSerializers
+   for (Map.Entry entry :
+   toCopy.defaultSerializers.entrySet()) {
 
-   type = toCopy.type;
-   if(type == null){
-   throw new NullPointerException("Type class cannot be 
null.");
+   this.defaultSerializers.put(entry.getKey(), 
deepCopySerializer(entry.getValue()));
+   }
+
+   // deep copy the serializer instances in kryoRegistrations
+   for (Map.Entry entry : 
toCopy.kryoRegistrations.entrySet()) {
--- End diff --

The problem is that we don't have the `ExecutionConfig` in the copy 
constructor.


---


[jira] [Updated] (FLINK-8979) Extend Kafka end-to-end tests to run with different versions

2018-04-19 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8979:

Fix Version/s: (was: 1.5.0)
   1.5.1
   1.6.0

> Extend Kafka end-to-end tests to run with different versions
> 
>
> Key: FLINK-8979
> URL: https://issues.apache.org/jira/browse/FLINK-8979
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> The current {{Kafka}} end-to-end test only runs with Kafka 0.10. We should 
> extend the test to also run with
> * Kafka 0.8
> * Kafka 0.9
> * Kafka 0.11
> Additionally we should change the test job to not be embarrassingly parallel 
> by introducing a shuffle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8982) End-to-end test: Queryable state

2018-04-19 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8982:

Fix Version/s: (was: 1.5.0)
   1.5.1
   1.6.0

> End-to-end test: Queryable state
> 
>
> Key: FLINK-8982
> URL: https://issues.apache.org/jira/browse/FLINK-8982
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> We should add an end-to-end test which verifies that {{Queryable State}} is 
> working.
> [~florianschmidt] and [~kkl0u] could you please provide more details for the 
> description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8977) End-to-end test: Manually resume job after terminal failure

2018-04-19 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8977:

Fix Version/s: (was: 1.5.0)
   1.5.1
   1.6.0

> End-to-end test: Manually resume job after terminal failure
> ---
>
> Key: FLINK-8977
> URL: https://issues.apache.org/jira/browse/FLINK-8977
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> We should add an end-to-end test which verifies that a job can be resumed 
> manually after a terminal job failure if there is a checkpoint. In order to 
> do that we should 
> # run the general purpose testing job FLINK-8971 
> # wait for the completion of a checkpoint
> # Trigger a failure which leads to a terminal failure
> # Resume the job from the retained checkpoint
> This end-to-end test should run with all state backend combinations: RocksDB 
> (incremental/full, async/sync), FsStateBackend (async/sync).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8286) Investigate Flink-Yarn-Kerberos integration for flip-6

2018-04-19 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444149#comment-16444149
 ] 

Aljoscha Krettek commented on FLINK-8286:
-

[~suez1224] Is there any update on this?

> Investigate Flink-Yarn-Kerberos integration for flip-6
> --
>
> Key: FLINK-8286
> URL: https://issues.apache.org/jira/browse/FLINK-8286
> Project: Flink
>  Issue Type: Task
>  Components: Security
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We've found some issues with the Flink-Yarn-Kerberos integration in the 
> current deployment model, we will need to investigate and test it for flip-6 
> when it's ready.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-7431) test instability in JobManagerFailsITCase

2018-04-19 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7431.
---
Resolution: Cannot Reproduce

Closing because we haven't seen this in a while and also because it's tests for 
the legacy JobManager/TaskManager.

> test instability in JobManagerFailsITCase
> -
>
> Key: FLINK-7431
> URL: https://issues.apache.org/jira/browse/FLINK-7431
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> In a branch with unrelated changes:
> {code}
> A TaskManager should go into a clean state in case of a JobManager 
> failure(org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase)  
> Time elapsed: 121.105 sec  <<< FAILURE!
> java.lang.AssertionError: assertion failed: timeout (119126026463 
> nanoseconds) during expectMsg while waiting for Acknowledge
>   at scala.Predef$.assert(Predef.scala:170)
>   at akka.testkit.TestKitBase$class.expectMsg_internal(TestKit.scala:338)
>   at akka.testkit.TestKitBase$class.expectMsg(TestKit.scala:315)
>   at akka.testkit.TestKit.expectMsg(TestKit.scala:718)
>   at 
> org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerFailsITCase.scala:118)
>   at 
> org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply(JobManagerFailsITCase.scala:104)
>   at 
> org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply(JobManagerFailsITCase.scala:104)
>   at akka.testkit.TestKitBase$class.within(TestKit.scala:296)
>   at akka.testkit.TestKit.within(TestKit.scala:718)
>   at akka.testkit.TestKitBase$class.within(TestKit.scala:310)
>   at akka.testkit.TestKit.within(TestKit.scala:718)
>   at 
> org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply$mcV$sp(JobManagerFailsITCase.scala:104)
>   at 
> org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(JobManagerFailsITCase.scala:85)
>   at 
> org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(JobManagerFailsITCase.scala:85)
>   at 
> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
>   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953)
>   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
>   at 
> org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase.withFixture(JobManagerFailsITCase.scala:37)
> {code}
> https://travis-ci.org/NicoK/flink/jobs/263422429
> full logs: https://transfer.sh/bRKz7/399.4.tar.gz (unfortunately not much 
> more info there, as far as I can see)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9220) Table program cannot be compiled

2018-04-19 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444092#comment-16444092
 ] 

Timo Walther commented on FLINK-9220:
-

Which package has your function? According to the exception {{Cannot determine 
simple type name "com"}} it seems that it can't find your package. Are you 
executing this program in your IDE or on the (local) Flink cluster? If cluster, 
are you sure that your function's class is in the jar that you submit to the 
cluster.

> Table program cannot be compiled
> 
>
> Key: FLINK-9220
> URL: https://issues.apache.org/jira/browse/FLINK-9220
> Project: Flink
>  Issue Type: Bug
>Reporter: Saurabh Garg
>Priority: Major
>
> Flink job gets failed with scalar UDF. I am using Flink 1.4. Issue came with 
> Scalar UDF
> Below is the error logs:
>  
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue. at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at 
> org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.codehaus.commons.compiler.CompileException: Line 6, Column 18: Cannot 
> determine simple type name "com" at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) at 
> org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064)
>  at 
> org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059)
>  at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) at 
> org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) at 
> org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) at 
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
> org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456) at 
> org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082)
>  at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077)
>  at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at 
> org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438) at 
> org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080)
>  at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077)
>  at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at 
> 

[jira] [Commented] (FLINK-9155) Provide message context information in DeserializationSchema

2018-04-19 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444080#comment-16444080
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-9155:


I think we can implement both logging and metrics to track this.
For metrics, we should be able to use the user variable functionality to have 
skip counters that can be grouped by topic / partition. This should allow more 
prompt alerts of skipped messages, and users should direct to logs for more 
details on the erroring record.
For logging, the information should contain topic, partition, and offset for 
debugging.

> Provide message context information in DeserializationSchema
> 
>
> Key: FLINK-9155
> URL: https://issues.apache.org/jira/browse/FLINK-9155
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alex Smirnov
>Priority: Minor
>
> There's no way to retrieve more information about corrupted message in the 
> DeserializationSchema class. It is only possible to return null, which is a 
> signal to skip the message, and to throw an exception, which will cause job 
> failure.
> For investigation purposes it would be good to have more information, like:
>  * kafka topic from which the message came from
>  ** in Flink 1.4, it is possible to subscribe using Pattern, so topic name is 
> not always evident
>  * kafka topic offset
> The idea is to write this information into the log file for further analysis. 
> Having topic name and offset allows to retrieve the message and investigate 
> it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9204) Improve visibility of records being skipped by the `DeserializationSchema` in the Kafka / Kinesis connectors

2018-04-19 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-9204.

Resolution: Duplicate

> Improve visibility of records being skipped by the `DeserializationSchema` in 
> the Kafka / Kinesis connectors
> 
>
> Key: FLINK-9204
> URL: https://issues.apache.org/jira/browse/FLINK-9204
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector, Kinesis Connector, Metrics
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.6.0
>
>
> Currently, users have to write some log for visibility if they are skipping 
> some record (by returning {{null}} from the deserialization schema).
> We should have better support for this natively in the connectors, either by 
> adding logs if {{null}} is returned by the user deserialization schema, or 
> adding counter metrics to track how many records were skipped so far.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9220) Table program cannot be compiled

2018-04-19 Thread Saurabh Garg (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444072#comment-16444072
 ] 

Saurabh Garg commented on FLINK-9220:
-

Registered this ScalarFunction on StreamTableEnvironment. Flink job gets 
started, goes to running state and then fails.

> Table program cannot be compiled
> 
>
> Key: FLINK-9220
> URL: https://issues.apache.org/jira/browse/FLINK-9220
> Project: Flink
>  Issue Type: Bug
>Reporter: Saurabh Garg
>Priority: Major
>
> Flink job gets failed with scalar UDF. I am using Flink 1.4. Issue came with 
> Scalar UDF
> Below is the error logs:
>  
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue. at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at 
> org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.codehaus.commons.compiler.CompileException: Line 6, Column 18: Cannot 
> determine simple type name "com" at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) at 
> org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064)
>  at 
> org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059)
>  at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) at 
> org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) at 
> org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) at 
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
> org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456) at 
> org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082)
>  at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077)
>  at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at 
> org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438) at 
> org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080)
>  at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077)
>  at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at 
> 

[jira] [Commented] (FLINK-9220) Table program cannot be compiled

2018-04-19 Thread Saurabh Garg (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444070#comment-16444070
 ] 

Saurabh Garg commented on FLINK-9220:
-

Query:  select id, HashCode(id) as hashcode from input1

Function is similar to provided in documentation.

public class HashCode extends ScalarFunction {
 private int factor = 12;

 public HashCode(int factor) {
 this.factor = factor;
 }
 public HashCode(){}

 public int eval(String s) {
 return s.hashCode() * factor;
 }
}

> Table program cannot be compiled
> 
>
> Key: FLINK-9220
> URL: https://issues.apache.org/jira/browse/FLINK-9220
> Project: Flink
>  Issue Type: Bug
>Reporter: Saurabh Garg
>Priority: Major
>
> Flink job gets failed with scalar UDF. I am using Flink 1.4. Issue came with 
> Scalar UDF
> Below is the error logs:
>  
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue. at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at 
> org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.codehaus.commons.compiler.CompileException: Line 6, Column 18: Cannot 
> determine simple type name "com" at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) at 
> org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064)
>  at 
> org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059)
>  at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) at 
> org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) at 
> org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) at 
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
> org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456) at 
> org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082)
>  at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077)
>  at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at 
> org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at 
> org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438) at 
> org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212) at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080)
>  at 
> org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077)
>  at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at 
> org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at 
> 

[jira] [Updated] (FLINK-9220) Table program cannot be compiled

2018-04-19 Thread Saurabh Garg (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saurabh Garg updated FLINK-9220:

Description: 
Flink job gets failed with scalar UDF. I am using Flink 1.4. Issue came with 
Scalar UDF

Below is the error logs:

 

org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue. at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at 
org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
 at 
org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.codehaus.commons.compiler.CompileException: Line 6, Column 18: Cannot 
determine simple type name "com" at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at 
org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) at 
org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) at 
org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064)
 at 
org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059)
 at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) at 
org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) at 
org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) at 
org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) at 
org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) at 
org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) at 
org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456) at 
org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212) at 
org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082)
 at 
org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077)
 at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at 
org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at 
org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at 
org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438) at 
org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212) at 
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080)
 at 
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077)
 at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at 
org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at 
org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at 
org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at 
org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8591) at 
org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4708) at 
org.codehaus.janino.UnitCompiler.access$8200(UnitCompiler.java:212) at 
org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4071)
 at 
org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4044)
 at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4874) at 

[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444068#comment-16444068
 ] 

ASF GitHub Bot commented on FLINK-8836:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5880#discussion_r182750453
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
@@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig 
executionConfig){
 * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
 */
protected KryoSerializer(KryoSerializer toCopy) {
-   defaultSerializers = toCopy.defaultSerializers;
-   defaultSerializerClasses = toCopy.defaultSerializerClasses;
 
-   kryoRegistrations = toCopy.kryoRegistrations;
+   this.type = checkNotNull(toCopy.type, "Type class cannot be 
null.");
+   this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+   this.defaultSerializers = new 
LinkedHashMap<>(toCopy.defaultSerializers.size());
+   this.kryoRegistrations = new 
LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+   // deep copy the serializer instances in defaultSerializers
+   for (Map.Entry entry :
+   toCopy.defaultSerializers.entrySet()) {
 
-   type = toCopy.type;
-   if(type == null){
-   throw new NullPointerException("Type class cannot be 
null.");
+   this.defaultSerializers.put(entry.getKey(), 
deepCopySerializer(entry.getValue()));
+   }
+
+   // deep copy the serializer instances in kryoRegistrations
+   for (Map.Entry entry : 
toCopy.kryoRegistrations.entrySet()) {
--- End diff --

One alternative approach to this loop (though I'm not sure would be 
better), is in the `buildKryoRegistrationsMethod` we always make a copy of the 
`ExecutionConfig.SerializableSerializer` when instantiating its corresponding 
`KryoRegistration`.
See 
https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537.
 Here we can make a copy already when building the registrations.

Then, when duplicating the `KryoSerializer`, for duplicating the 
registrations, this would only be a matter of calling `buildKryoRegistrations` 
again with the execution config because that method would handle stateful 
serializer registrations properly.
IMO, this seems like a cleaner solution. What do you think?


> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...

2018-04-19 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5880#discussion_r182750453
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
@@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig 
executionConfig){
 * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
 */
protected KryoSerializer(KryoSerializer toCopy) {
-   defaultSerializers = toCopy.defaultSerializers;
-   defaultSerializerClasses = toCopy.defaultSerializerClasses;
 
-   kryoRegistrations = toCopy.kryoRegistrations;
+   this.type = checkNotNull(toCopy.type, "Type class cannot be 
null.");
+   this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+   this.defaultSerializers = new 
LinkedHashMap<>(toCopy.defaultSerializers.size());
+   this.kryoRegistrations = new 
LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+   // deep copy the serializer instances in defaultSerializers
+   for (Map.Entry entry :
+   toCopy.defaultSerializers.entrySet()) {
 
-   type = toCopy.type;
-   if(type == null){
-   throw new NullPointerException("Type class cannot be 
null.");
+   this.defaultSerializers.put(entry.getKey(), 
deepCopySerializer(entry.getValue()));
+   }
+
+   // deep copy the serializer instances in kryoRegistrations
+   for (Map.Entry entry : 
toCopy.kryoRegistrations.entrySet()) {
--- End diff --

One alternative approach to this loop (though I'm not sure would be 
better), is in the `buildKryoRegistrationsMethod` we always make a copy of the 
`ExecutionConfig.SerializableSerializer` when instantiating its corresponding 
`KryoRegistration`.
See 
https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537.
 Here we can make a copy already when building the registrations.

Then, when duplicating the `KryoSerializer`, for duplicating the 
registrations, this would only be a matter of calling `buildKryoRegistrations` 
again with the execution config because that method would handle stateful 
serializer registrations properly.
IMO, this seems like a cleaner solution. What do you think?


---


[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444066#comment-16444066
 ] 

ASF GitHub Bot commented on FLINK-8836:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5880#discussion_r182749960
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
@@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig 
executionConfig){
 * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
 */
protected KryoSerializer(KryoSerializer toCopy) {
-   defaultSerializers = toCopy.defaultSerializers;
-   defaultSerializerClasses = toCopy.defaultSerializerClasses;
 
-   kryoRegistrations = toCopy.kryoRegistrations;
+   this.type = checkNotNull(toCopy.type, "Type class cannot be 
null.");
+   this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+   this.defaultSerializers = new 
LinkedHashMap<>(toCopy.defaultSerializers.size());
+   this.kryoRegistrations = new 
LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+   // deep copy the serializer instances in defaultSerializers
+   for (Map.Entry entry :
+   toCopy.defaultSerializers.entrySet()) {
 
-   type = toCopy.type;
-   if(type == null){
-   throw new NullPointerException("Type class cannot be 
null.");
+   this.defaultSerializers.put(entry.getKey(), 
deepCopySerializer(entry.getValue()));
+   }
+
+   // deep copy the serializer instances in kryoRegistrations
+   for (Map.Entry entry : 
toCopy.kryoRegistrations.entrySet()) {
--- End diff --

One alternative approach to this loop (though I'm not sure would be 
better), is in the `buildKryoRegistrationsMethod` we always make a copy of the 
`ExecutionConfig.SerializableSerializer` when instantiating its corresponding 
`KryoRegistration`.
See 
https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537.
 Here we can make a copy already when building the registrations.

Then, when duplicating the `KryoSerializer`, for duplicating the 
registrations, this would only be a matter of calling `buildKryoRegistrations` 
again from the execution config because that method would handle stateful 
serializer registrations properly.
IMO, this seems like a cleaner solution. What do you think?




> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...

2018-04-19 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5880#discussion_r182749960
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
@@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig 
executionConfig){
 * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
 */
protected KryoSerializer(KryoSerializer toCopy) {
-   defaultSerializers = toCopy.defaultSerializers;
-   defaultSerializerClasses = toCopy.defaultSerializerClasses;
 
-   kryoRegistrations = toCopy.kryoRegistrations;
+   this.type = checkNotNull(toCopy.type, "Type class cannot be 
null.");
+   this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+   this.defaultSerializers = new 
LinkedHashMap<>(toCopy.defaultSerializers.size());
+   this.kryoRegistrations = new 
LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+   // deep copy the serializer instances in defaultSerializers
+   for (Map.Entry entry :
+   toCopy.defaultSerializers.entrySet()) {
 
-   type = toCopy.type;
-   if(type == null){
-   throw new NullPointerException("Type class cannot be 
null.");
+   this.defaultSerializers.put(entry.getKey(), 
deepCopySerializer(entry.getValue()));
+   }
+
+   // deep copy the serializer instances in kryoRegistrations
+   for (Map.Entry entry : 
toCopy.kryoRegistrations.entrySet()) {
--- End diff --

One alternative approach to this loop (though I'm not sure would be 
better), is in the `buildKryoRegistrationsMethod` we always make a copy of the 
`ExecutionConfig.SerializableSerializer` when instantiating its corresponding 
`KryoRegistration`.
See 
https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537.
 Here we can make a copy already when building the registrations.

Then, when duplicating the `KryoSerializer`, for duplicating the 
registrations, this would only be a matter of calling `buildKryoRegistrations` 
again from the execution config because that method would handle stateful 
serializer registrations properly.
IMO, this seems like a cleaner solution. What do you think?




---


[jira] [Commented] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444055#comment-16444055
 ] 

ASF GitHub Bot commented on FLINK-8785:
---

Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5877
  
@zentol  ok, I thought that it was a small change before, so you mean we 
can make more changes on the messages of the exceptions and let them reported 
more properly ? 
I will test all the cases in the JobSubmissionFailsITCase. 


> JobSubmitHandler does not handle JobSubmissionExceptions
> 
>
> Key: FLINK-8785
> URL: https://issues.apache.org/jira/browse/FLINK-8785
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, JobManager, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: buptljy
>Priority: Critical
>  Labels: flip-6
>
> If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a 
> {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal 
> server error" instead of signaling the failed job submission.
> This can for example occur if the transmitted execution graph is faulty, as 
> tested by the \{{JobSubmissionFailsITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5877: [FLINK-8785][Job-Submission]Handle JobSubmissionException...

2018-04-19 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5877
  
@zentol  ok, I thought that it was a small change before, so you mean we 
can make more changes on the messages of the exceptions and let them reported 
more properly ? 
I will test all the cases in the JobSubmissionFailsITCase. 


---


[GitHub] flink issue #5813: [FLINK-8980] [e2e] Add a BucketingSink end-to-end test

2018-04-19 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5813
  
Sounds good @twalthr !


---


[jira] [Commented] (FLINK-8980) End-to-end test: BucketingSink

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444044#comment-16444044
 ] 

ASF GitHub Bot commented on FLINK-8980:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5813
  
Sounds good @twalthr !


> End-to-end test: BucketingSink
> --
>
> Key: FLINK-8980
> URL: https://issues.apache.org/jira/browse/FLINK-8980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> In order to verify the {{BucketingSink}}, we should add an end-to-end test 
> which verifies that the {{BucketingSink}} does not lose data under failures.
> An idea would be to have a CountUp job which simply counts up a counter which 
> is persisted. The emitted values will be written to disk by the 
> {{BucketingSink}}. Now we should kill randomly Flink processes (cluster 
> entrypoint and TaskExecutors) to simulate failures. Even after these 
> failures, the written files should contain the correct sequence of numbers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >