[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125272#comment-16125272
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
@bowenli86 what is it failing on? AFAIK, the only serialization changes in 
1.3.2 compared to 1.3.0 are serialization of some specific `TypeSerializer`s.


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-08-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
@bowenli86 what is it failing on? AFAIK, the only serialization changes in 
1.3.2 compared to 1.3.0 are serialization of some specific `TypeSerializer`s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125270#comment-16125270
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
@tzulitai Thanks Gordan for the quick response! You are right, I set the 
schema in unit test to be static and it passes.

For my Flink job. When I build it against Flink 1.3.0, it can run well on 
EMR. When I build it against Flink 1.3.2, it fails to run. Has anything related 
to serialization changed since Flink 1.3.0?


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-08-13 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
@tzulitai Thanks Gordan for the quick response! You are right, I set the 
schema in unit test to be static and it passes.

For my Flink job. When I build it against Flink 1.3.0, it can run well on 
EMR. When I build it against Flink 1.3.2, it fails to run. Has anything related 
to serialization changed since Flink 1.3.0?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125264#comment-16125264
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
@bowenli86 on the other hand, in the constructors of 
`FlinkKinesisProducer`, we probably should add eager safe checks on the 
serializability of the provided `KinesisSerializationSchema` and have good 
error messages in case it isn't serializable. I'll open a separate JIRA / PR 
for that.


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-08-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
@bowenli86 on the other hand, in the constructors of 
`FlinkKinesisProducer`, we probably should add eager safe checks on the 
serializability of the provided `KinesisSerializationSchema` and have good 
error messages in case it isn't serializable. I'll open a separate JIRA / PR 
for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-08-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
@bowenli86 I think the test you posted above is failing because the 
provided `KinesisSerializationSchema` is not serializable. It is an anonymous 
class, which would contain a reference to the enclosing (which I guess is the 
test class, hence not serializable.)

To make it work you either need to make the test class serializable, or 
define a static `KinesisSerializationSchema` subclass instead of using an 
anonymous class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125262#comment-16125262
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
@bowenli86 I think the test you posted above is failing because the 
provided `KinesisSerializationSchema` is not serializable. It is an anonymous 
class, which would contain a reference to the enclosing (which I guess is the 
test class, hence not serializable.)

To make it work you either need to make the test class serializable, or 
define a static `KinesisSerializationSchema` subclass instead of using an 
anonymous class.


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125252#comment-16125252
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
hmmm... I was deploying our Flink job with this change. The Flink job 
failed to start, and log reports:

```
The implementation of the RichSinkFunction is not serializable. The object 
probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)

org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1131)
```

of which the implementation of the RichSinkFunction is 
FlinkKinesisProducer. The only field I add to FlinkKinesisProducer is 
KinesisProducerConfiguration. So I made KinesisProducerConfiguration 
`transient`, and ran the Flink job, the job still fails. 

Thus I doubted if FlinkKinesisProducer is already not serializable 
currently. To verify that, I created a test for the current 
FlinkKinesisProducer in master which doesn't have my PR change. Unit test is:

```
@Test
public void testProducerSerializable() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKey");

testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");

testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
KinesisConfigUtil.validateAwsConfiguration(testConfig);

FlinkKinesisProducer producer = new FlinkKinesisProducer(new 
KinesisSerializationSchema() {
@Override
public ByteBuffer serialize(Object element) {
return null;
}

@Override
public String getTargetStream(Object element) {
return null;
}
}, testConfig);

ClosureCleaner.ensureSerializable(producer);
}
```

And it fails. Thus, I think something in FlinkKinesisProducer might not 
serializable, and already breaks FlinkKinesisProducer. 

@tzulitai @aljoscha  Any insights on this?


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-08-13 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
hmmm... I was deploying our Flink job with this change. The Flink job 
failed to start, and log reports:

```
The implementation of the RichSinkFunction is not serializable. The object 
probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)

org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1131)
```

of which the implementation of the RichSinkFunction is 
FlinkKinesisProducer. The only field I add to FlinkKinesisProducer is 
KinesisProducerConfiguration. So I made KinesisProducerConfiguration 
`transient`, and ran the Flink job, the job still fails. 

Thus I doubted if FlinkKinesisProducer is already not serializable 
currently. To verify that, I created a test for the current 
FlinkKinesisProducer in master which doesn't have my PR change. Unit test is:

```
@Test
public void testProducerSerializable() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKey");

testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");

testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
KinesisConfigUtil.validateAwsConfiguration(testConfig);

FlinkKinesisProducer producer = new FlinkKinesisProducer(new 
KinesisSerializationSchema() {
@Override
public ByteBuffer serialize(Object element) {
return null;
}

@Override
public String getTargetStream(Object element) {
return null;
}
}, testConfig);

ClosureCleaner.ensureSerializable(producer);
}
```

And it fails. Thus, I think something in FlinkKinesisProducer might not 
serializable, and already breaks FlinkKinesisProducer. 

@tzulitai @aljoscha  Any insights on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1234) Make Hadoop2 profile default

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125243#comment-16125243
 ] 

ASF GitHub Bot commented on FLINK-1234:
---

GitHub user zhuganghuaonnet opened a pull request:

https://github.com/apache/flink/pull/4535

Eventhubs-support read from and write to Azure eventhubs

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-1234] [component] Title of 
the pull request", where *FLINK-1234* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull 

[GitHub] flink pull request #4535: Eventhubs-support read from and write to Azure eve...

2017-08-13 Thread zhuganghuaonnet
GitHub user zhuganghuaonnet opened a pull request:

https://github.com/apache/flink/pull/4535

Eventhubs-support read from and write to Azure eventhubs

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-1234] [component] Title of 
the pull request", where *FLINK-1234* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhuganghuaonnet/flink eventhubs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4535.patch

To close 

[jira] [Updated] (FLINK-7439) Support variable arguments for UDTF in SQL

2017-08-13 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-7439:
---
Description: 
Currently, both UDF and UDAF support variable parameters, but UDTF not. 

FLINK-5882 supports variable UDTF for Table API only, but missed SQL.

  was:Currently, both UDF and UDAF support variable parameters, but UDTF not. 


> Support variable arguments for UDTF in SQL
> --
>
> Key: FLINK-7439
> URL: https://issues.apache.org/jira/browse/FLINK-7439
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently, both UDF and UDAF support variable parameters, but UDTF not. 
> FLINK-5882 supports variable UDTF for Table API only, but missed SQL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7439) Support variable arguments for UDTF in SQL

2017-08-13 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-7439:
---
Summary: Support variable arguments for UDTF in SQL  (was: Support variable 
arguments for UDTF)

> Support variable arguments for UDTF in SQL
> --
>
> Key: FLINK-7439
> URL: https://issues.apache.org/jira/browse/FLINK-7439
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently, both UDF and UDAF support variable parameters, but UDTF not. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-13 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132867483
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
+   Collection> matchedResult, 
AfterMatchSkipStrategy afterMatchSkipStrategy) {
+   Set discardEvents = new HashSet<>();
+   switch(afterMatchSkipStrategy.getStrategy()) {
+   case SKIP_TO_LAST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_TO_FIRST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_PAST_LAST_EVENT:
+   for (Map resultMap: 
matchedResult) {
+   for (List eventList: 
resultMap.values()) {
+   discardEvents.addAll(eventList);
+   }
+   }
+   break;
+   }
+   if (!discardEvents.isEmpty()) {
+   List discardStates = new 
ArrayList<>();
+   for (ComputationState computationState : 
computationStates) {
+   Map partialMatch = 
extractCurrentMatches(computationState);
+   for (List list: partialMatch.values()) {
+   for (T e: list) {
+   if (discardEvents.contains(e)) {
+   // discard the 
computation state.
+   
eventSharedBuffer.release(
+   
NFAStateNameHandler.getOriginalNameFromInternal(
+   
computationState.getState().getName()),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   
computationState.getCounter()
+   );
+   
discardStates.add(computationState);
--- End diff --

Should add **break;** after **discardStates.add(computationState);**, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125189#comment-16125189
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132867483
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
+   Collection> matchedResult, 
AfterMatchSkipStrategy afterMatchSkipStrategy) {
+   Set discardEvents = new HashSet<>();
+   switch(afterMatchSkipStrategy.getStrategy()) {
+   case SKIP_TO_LAST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_TO_FIRST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_PAST_LAST_EVENT:
+   for (Map resultMap: 
matchedResult) {
+   for (List eventList: 
resultMap.values()) {
+   discardEvents.addAll(eventList);
+   }
+   }
+   break;
+   }
+   if (!discardEvents.isEmpty()) {
+   List discardStates = new 
ArrayList<>();
+   for (ComputationState computationState : 
computationStates) {
+   Map partialMatch = 
extractCurrentMatches(computationState);
+   for (List list: partialMatch.values()) {
+   for (T e: list) {
+   if (discardEvents.contains(e)) {
+   // discard the 
computation state.
+   
eventSharedBuffer.release(
+   
NFAStateNameHandler.getOriginalNameFromInternal(
+   
computationState.getState().getName()),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   
computationState.getCounter()
+   );
+   
discardStates.add(computationState);
--- End diff --

Should add **break;** after **discardStates.add(computationState);**, right?


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support 

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-08-13 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125182#comment-16125182
 ] 

Xingcan Cui commented on FLINK-6233:


Hi [~fhueske], thanks for the previous work by you and yuhong, the 
implementation process is going well. Nevertheless, I've got some minor 
questions/ideas.

# Considering that the core logics of the rowtime inner join and proctime inner 
join are almost the same. Can I extract an abstract {{TimeWindowInnerJoin}} 
class and let the {{ProcTimeWindowInnerJoin}} and {{RowTimeWindowInnerJoin}} 
extend it?
# The clean up process for the cached data is triggered by ProcessingTimeTimers 
in the {{ProcTimeWindowInnerJoin}}. For {{RowTimeWindowInnerJoin}}, I think 
this process could be directly triggered by the watermarks without registering 
the EventTimeTimer, right?
# Since the collections provided by the state backend are simple, it may be 
inefficient to search for the out-of-dated records. I think the current 
"short-circuit" codes (as shown below) can not clean all the expired data.
{code:java}
   while (keyIter.hasNext && !validTimestamp) {
  val recordTime = keyIter.next
  if (recordTime < expiredTime) {
removeList.add(recordTime)
  } else {
// we found a timestamp that is still valid
validTimestamp = true
  }
}
{code}
To cope with that, I plan to split the "cache window" into continuous 
static-panes, and casting one to expired as a whole. By doing like that, we may 
store some extra records, whose time interval is equal to the static span of 
the panes, but can remove the expired data efficiently.
# I'd like to introduce an extra {{allowLateness}} parameter (which can be set 
in the {{StreamQueryConfig}}) to the join function. But for now, I'll give it a 
default {{0L}} value.

> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime  s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125145#comment-16125145
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates:

- Create the fix size `LocalBufferPool` for floating buffers
- Assign the exclusive buffers for `InputChannel` directly
- The proposed `BufferPoolListener`  will be included in next PR


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-13 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates:

- Create the fix size `LocalBufferPool` for floating buffers
- Assign the exclusive buffers for `InputChannel` directly
- The proposed `BufferPoolListener`  will be included in next PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125142#comment-16125142
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132863952
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
 ---
@@ -22,12 +22,12 @@ import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.validate.SqlMonotonicity
 
 /**
-  * Function that materializes a time attribute to the metadata timestamp. 
After materialization
-  * the result can be used in regular arithmetical calculations.
+  * Function that materializes a processing time attribute.
+  * After materialization the result can be used in regular arithmetical 
calculations.
   */
-object TimeMaterializationSqlFunction
+object ProctimeSqlFunction
--- End diff --

Should we move this object to `org.apache.flink.table.functions.sql` 
package? 


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132863952
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
 ---
@@ -22,12 +22,12 @@ import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.validate.SqlMonotonicity
 
 /**
-  * Function that materializes a time attribute to the metadata timestamp. 
After materialization
-  * the result can be used in regular arithmetical calculations.
+  * Function that materializes a processing time attribute.
+  * After materialization the result can be used in regular arithmetical 
calculations.
   */
-object TimeMaterializationSqlFunction
+object ProctimeSqlFunction
--- End diff --

Should we move this object to `org.apache.flink.table.functions.sql` 
package? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7439) Support variable arguments for UDTF

2017-08-13 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-7439:
---
Component/s: Table API & SQL

> Support variable arguments for UDTF
> ---
>
> Key: FLINK-7439
> URL: https://issues.apache.org/jira/browse/FLINK-7439
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently, both UDF and UDAF support variable parameters, but UDTF not. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7439) Support variable arguments for UDTF

2017-08-13 Thread Jark Wu (JIRA)
Jark Wu created FLINK-7439:
--

 Summary: Support variable arguments for UDTF
 Key: FLINK-7439
 URL: https://issues.apache.org/jira/browse/FLINK-7439
 Project: Flink
  Issue Type: Improvement
Reporter: Jark Wu
Assignee: Jark Wu


Currently, both UDF and UDAF support variable parameters, but UDTF not. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125091#comment-16125091
 ] 

ASF GitHub Bot commented on FLINK-7358:
---

GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/4534

[FLINK-7358][table]Add implicitly converts support for User-defined function

In this PR i had Add implicitly converts support for User-defined function.
* Update the udfs document.
* Add implicitly converts for user-defined scalarFunction and tableFunction.
* Add test case for the changes.


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-1234] [component] Title of 
the pull request", where *FLINK-1234* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does 

[GitHub] flink pull request #4534: [FLINK-7358][table]Add implicitly converts support...

2017-08-13 Thread sunjincheng121
GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/4534

[FLINK-7358][table]Add implicitly converts support for User-defined function

In this PR i had Add implicitly converts support for User-defined function.
* Update the udfs document.
* Add implicitly converts for user-defined scalarFunction and tableFunction.
* Add test case for the changes.


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-1234] [component] Title of 
the pull request", where *FLINK-1234* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? ( docs )



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink 

[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function

2017-08-13 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125083#comment-16125083
 ] 

sunjincheng commented on FLINK-7358:


[~fhueske] Yes, Agree with you. I had open the PR in calcite. And do some 
communication and discussion with julian. 
BTW.  welcome you to join in the discussion. :-).

> Add  implicitly converts support for User-defined function
> --
>
> Key: FLINK-7358
> URL: https://issues.apache.org/jira/browse/FLINK-7358
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently if user defined a UDF as follows:
> {code}
> object Func extends ScalarFunction {
>   def eval(a: Int, b: Long): String = {
> ...
>   }
> }
> {code}
> And if the table schema is (a: Int, b: int, c: String), then we can not call 
> the UDF `Func('a, 'b)`. So
> I want add implicitly converts when we call UDF. The implicitly convert rule 
> is:
> BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> 
> FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO
> *Note:
> In this JIRA. only for TableAPI, And SQL will be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-1908.*
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7358) Add implicitly converts support for User-defined function

2017-08-13 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125083#comment-16125083
 ] 

sunjincheng edited comment on FLINK-7358 at 8/13/17 11:30 PM:
--

[~fhueske] Yes, Agree with you. I had open the JIRA. in 
calcite(https://issues.apache.org/jira/browse/CALCITE-1908). And do some 
communication and discussion with julian. 
BTW.  welcome you to join in the discussion. :-).


was (Author: sunjincheng121):
[~fhueske] Yes, Agree with you. I had open the PR in 
calcite(https://issues.apache.org/jira/browse/CALCITE-1908). And do some 
communication and discussion with julian. 
BTW.  welcome you to join in the discussion. :-).

> Add  implicitly converts support for User-defined function
> --
>
> Key: FLINK-7358
> URL: https://issues.apache.org/jira/browse/FLINK-7358
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently if user defined a UDF as follows:
> {code}
> object Func extends ScalarFunction {
>   def eval(a: Int, b: Long): String = {
> ...
>   }
> }
> {code}
> And if the table schema is (a: Int, b: int, c: String), then we can not call 
> the UDF `Func('a, 'b)`. So
> I want add implicitly converts when we call UDF. The implicitly convert rule 
> is:
> BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> 
> FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO
> *Note:
> In this JIRA. only for TableAPI, And SQL will be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-1908.*
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7358) Add implicitly converts support for User-defined function

2017-08-13 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125083#comment-16125083
 ] 

sunjincheng edited comment on FLINK-7358 at 8/13/17 11:30 PM:
--

[~fhueske] Yes, Agree with you. I had open the PR in 
calcite(https://issues.apache.org/jira/browse/CALCITE-1908). And do some 
communication and discussion with julian. 
BTW.  welcome you to join in the discussion. :-).


was (Author: sunjincheng121):
[~fhueske] Yes, Agree with you. I had open the PR in calcite. And do some 
communication and discussion with julian. 
BTW.  welcome you to join in the discussion. :-).

> Add  implicitly converts support for User-defined function
> --
>
> Key: FLINK-7358
> URL: https://issues.apache.org/jira/browse/FLINK-7358
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently if user defined a UDF as follows:
> {code}
> object Func extends ScalarFunction {
>   def eval(a: Int, b: Long): String = {
> ...
>   }
> }
> {code}
> And if the table schema is (a: Int, b: int, c: String), then we can not call 
> the UDF `Func('a, 'b)`. So
> I want add implicitly converts when we call UDF. The implicitly convert rule 
> is:
> BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> 
> FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO
> *Note:
> In this JIRA. only for TableAPI, And SQL will be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-1908.*
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7396) Don't put multiple directories in HADOOP_CONF_DIR in config.sh

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125069#comment-16125069
 ] 

ASF GitHub Bot commented on FLINK-7396:
---

Github user zjureel closed the pull request at:

https://github.com/apache/flink/pull/4511


> Don't put multiple directories in HADOOP_CONF_DIR in config.sh
> --
>
> Key: FLINK-7396
> URL: https://issues.apache.org/jira/browse/FLINK-7396
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> In config.sh we do this:
> {code}
> # Check if deprecated HADOOP_HOME is set.
> if [ -n "$HADOOP_HOME" ]; then
> # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path
> if [ -d "$HADOOP_HOME/conf" ]; then
> # its a Hadoop 1.x
> HADOOP_CONF_DIR="$HADOOP_CONF_DIR:$HADOOP_HOME/conf"
> fi
> if [ -d "$HADOOP_HOME/etc/hadoop" ]; then
> # Its Hadoop 2.2+
> HADOOP_CONF_DIR="$HADOOP_CONF_DIR:$HADOOP_HOME/etc/hadoop"
> fi
> fi
> {code}
> while our {{HadoopFileSystem}} actually only treats this paths as a single 
> path, not a colon-separated path: 
> https://github.com/apache/flink/blob/854b05376a459a6197e41e141bb28a9befe481ad/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java#L236
> I also think that other tools don't assume multiple paths in there and at 
> least one user ran into the problem on their setup.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7396) Don't put multiple directories in HADOOP_CONF_DIR in config.sh

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125068#comment-16125068
 ] 

ASF GitHub Bot commented on FLINK-7396:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4511
  
Thank you for merging it


> Don't put multiple directories in HADOOP_CONF_DIR in config.sh
> --
>
> Key: FLINK-7396
> URL: https://issues.apache.org/jira/browse/FLINK-7396
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> In config.sh we do this:
> {code}
> # Check if deprecated HADOOP_HOME is set.
> if [ -n "$HADOOP_HOME" ]; then
> # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path
> if [ -d "$HADOOP_HOME/conf" ]; then
> # its a Hadoop 1.x
> HADOOP_CONF_DIR="$HADOOP_CONF_DIR:$HADOOP_HOME/conf"
> fi
> if [ -d "$HADOOP_HOME/etc/hadoop" ]; then
> # Its Hadoop 2.2+
> HADOOP_CONF_DIR="$HADOOP_CONF_DIR:$HADOOP_HOME/etc/hadoop"
> fi
> fi
> {code}
> while our {{HadoopFileSystem}} actually only treats this paths as a single 
> path, not a colon-separated path: 
> https://github.com/apache/flink/blob/854b05376a459a6197e41e141bb28a9befe481ad/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java#L236
> I also think that other tools don't assume multiple paths in there and at 
> least one user ran into the problem on their setup.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4511: [FLINK-7396] Don't put multiple directories in HAD...

2017-08-13 Thread zjureel
Github user zjureel closed the pull request at:

https://github.com/apache/flink/pull/4511


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4511: [FLINK-7396] Don't put multiple directories in HADOOP_CON...

2017-08-13 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4511
  
Thank you for merging it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125061#comment-16125061
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132854814
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] 
timestamp.
+  */
+class OutputRowtimeProcessFunction[OUT](
+function: MapFunction[CRow, OUT],
+rowtimeIdx: Int)
--- End diff --

in fact the data type is changed to keep the data type unchanged. A time 
indicator is externally represented and treated as a `TIMESTAMP` field and only 
internally handled as `LONG`. Therefore, we need to convert it into a 
`TIMESTAMP` once the result is converted into a `DataStream`.

You are right, that we need to convert all time indicators to `TIMESTAMP` 
and not only one. This is currently enforced by the exception that you 
observed. Currently users have to cast all but one time indicator attributes to 
`TIMESTAMP`. That will also convert them from `long` to `Timestamp`.


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132854814
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] 
timestamp.
+  */
+class OutputRowtimeProcessFunction[OUT](
+function: MapFunction[CRow, OUT],
+rowtimeIdx: Int)
--- End diff --

in fact the data type is changed to keep the data type unchanged. A time 
indicator is externally represented and treated as a `TIMESTAMP` field and only 
internally handled as `LONG`. Therefore, we need to convert it into a 
`TIMESTAMP` once the result is converted into a `DataStream`.

You are right, that we need to convert all time indicators to `TIMESTAMP` 
and not only one. This is currently enforced by the exception that you 
observed. Currently users have to cast all but one time indicator attributes to 
`TIMESTAMP`. That will also convert them from `long` to `Timestamp`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125053#comment-16125053
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132854238
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

I think we should avoid implicit defaults like using the timestamp 
attribute of the left most table (the left most table might not have a time 
indicator attribute, join order optimization would change the order of tables) 
and special cases for queries like `SELECT *`. 

When a `Table` is converted into a `DataStream` it is likely that the 
resulting stream is further processed by logic that cannot be expressed in SQL 
/ Table API. If a `Table` has multiple timestamp attributes, IMO a user should 
be forced to make a choice for the `StreamRecord` timestamp, because the 
semantics of any subsequent time-based operations will depend on that. I see 
two ways to do that:
- ensure that only one attribute is a time indicator by casting the others 
to `TIMESTAMP`
- let the user specify which field should be used as timestamp as an 
additional parameter of the `toAppendStream` and `toRetractStream` methods.

We could also do both.

I agree with @wuchong that we do not need this restriction when we emit a 
`Table` to a `TableSink`.


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132854238
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

I think we should avoid implicit defaults like using the timestamp 
attribute of the left most table (the left most table might not have a time 
indicator attribute, join order optimization would change the order of tables) 
and special cases for queries like `SELECT *`. 

When a `Table` is converted into a `DataStream` it is likely that the 
resulting stream is further processed by logic that cannot be expressed in SQL 
/ Table API. If a `Table` has multiple timestamp attributes, IMO a user should 
be forced to make a choice for the `StreamRecord` timestamp, because the 
semantics of any subsequent time-based operations will depend on that. I see 
two ways to do that:
- ensure that only one attribute is a time indicator by casting the others 
to `TIMESTAMP`
- let the user specify which field should be used as timestamp as an 
additional parameter of the `toAppendStream` and `toRetractStream` methods.

We could also do both.

I agree with @wuchong that we do not need this restriction when we emit a 
`Table` to a `TableSink`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125010#comment-16125010
 ] 

ASF GitHub Bot commented on FLINK-7423:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4525
  
`MutableObjectIterator#next` allows/requires a `null` result: "@return The 
object or null if the iterator is exhausted.". The documentation 
could be improved, e.g. in `MergeIterator` the returned object is not 
necessarily the immediately passed argument.

`DataSourceTask` looks to be the only `InputFormat` consumer re-passing a 
reuse object.

It seems the reason to allow returning `null` is handling, for example, a 
bad record at the end of a file, such that `reachedEnd` would have returned 
`false` but there is no record to return.

Object reuse is always optional so is not an issue for immutable types.


> Always reuse an instance  to get elements from the inputFormat 
> ---
>
> Key: FLINK-7423
> URL: https://issues.apache.org/jira/browse/FLINK-7423
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
> In InputFormatSourceFunction.java:
> {code:java}
> OUT nextElement = serializer.createInstance();
>   while (isRunning) {
>   format.open(splitIterator.next());
>   // for each element we also check if cancel
>   // was called by checking the isRunning flag
>   while (isRunning && !format.reachedEnd()) {
>   nextElement = 
> format.nextRecord(nextElement);
>   if (nextElement != null) {
>   ctx.collect(nextElement);
>   } else {
>   break;
>   }
>   }
>   format.close();
>   completedSplitsCounter.inc();
>   if (isRunning) {
>   isRunning = splitIterator.hasNext();
>   }
>   }
> {code}
> the format may return other element or null when nextRecord, that will may 
> cause exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...

2017-08-13 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4525
  
`MutableObjectIterator#next` allows/requires a `null` result: "@return The 
object or null if the iterator is exhausted.". The documentation 
could be improved, e.g. in `MergeIterator` the returned object is not 
necessarily the immediately passed argument.

`DataSourceTask` looks to be the only `InputFormat` consumer re-passing a 
reuse object.

It seems the reason to allow returning `null` is handling, for example, a 
bad record at the end of a file, such that `reachedEnd` would have returned 
`false` but there is no record to return.

Object reuse is always optional so is not an issue for immutable types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-13 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-7378:

Description: 
Currently the number of network buffers in {{LocalBufferPool}} for 
{{SingleInputGate}} is limited by {{a *  + b}}, where a is 
the number of exclusive buffers for each channel and b is the number of 
floating buffers shared by all channels.

Considering the credit-based flow control feature, we want to create a fix size 
buffer pool used to manage the floating buffers for {{SingleInputGate}}. And 
the exclusive buffers are assigned to {{InputChannel}}s directly.

  was:
Currently the number of network buffers in {{LocalBufferPool}} for 
{{SingleInputGate}} is limited by {{a *  + b}}, where a is 
the number of exclusive buffers for each channel and b is the number of 
floating buffers shared by all channels.

Considering the credit-based flow control feature, we want to implement a new 
fixed size buffer pool type used to manage the floating buffers for 
{{SingleInputGate}}.

Compared with {{LocalBufferPool}}, this is a non-rebalancing buffer pool which 
will not participate in redistributing the left available buffers in 
{{NetworkBufferPool}}.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-13 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-7378:

Summary: Create a fix size (non rebalancing) buffer pool type for the 
floating buffers  (was: Implement the FixedBufferPool for floating buffers of 
SingleInputGate)

> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to implement a new 
> fixed size buffer pool type used to manage the floating buffers for 
> {{SingleInputGate}}.
> Compared with {{LocalBufferPool}}, this is a non-rebalancing buffer pool 
> which will not participate in redistributing the left available buffers in 
> {{NetworkBufferPool}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-13 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132843982
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link #setParallelism(int)}.
+*
+* @param configuration
+*  Pass a custom configuration into the cluster
+* @return A local execution environment with the specified parallelism.
+*/
+   public 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124952#comment-16124952
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132843982
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import 
org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import 
org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import 
org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * The PythonStreamExecutionEnvironment is the context in which a 
streaming program is executed.
+ * 
+ *
+ * The environment provides methods to control the job execution (such 
as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with 
the outside world
+ * (data access).
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+   private final StreamExecutionEnvironment env;
+
+   /**
+* A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+* care for required Jython serializers registration.
+*
+* @return The python execution environment of the context in which the 
program is
+* executed.
+*/
+   public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+   return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+   }
+
+   /**
+* Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+* will run the program in a multi-threaded fashion in the same JVM as 
the
+* environment was created in. The default parallelism of the local
+* environment is the number of hardware contexts (CPU cores / threads),
+* unless it was specified differently by {@link 

[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124929#comment-16124929
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132842811
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   throw new TableException("Field name can not be '*'.")
 }
 
-(fieldNames.toArray, fieldIndexes.toArray)
+(fieldNames.toArray, fieldIndexes.toArray) // build fails if not 
converted to array
--- End diff --

In my local environment, `toArray` also seems to be redundant.


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132842811
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   throw new TableException("Field name can not be '*'.")
 }
 
-(fieldNames.toArray, fieldIndexes.toArray)
+(fieldNames.toArray, fieldIndexes.toArray) // build fails if not 
converted to array
--- End diff --

In my local environment, `toArray` also seems to be redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124928#comment-16124928
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132842489
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
--- End diff --

I agree that the function names here are a bit confusing - in essence this 
function locates a single test file, while the function in the next code line 
`getFilesInFolder` collects files that start with `test_`, thus the main test 
file `run_all_tests.py` will be filtered and not be included. 
So, in order to be more readable and robust, I changed the 
`getFilesInFolder` to receive one more argument of `excludes` and call it with 
the main test file in the `excludes` argument.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-13 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132842489
  
--- Diff: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+   final private static String defaultPythonScriptName = 
"run_all_tests.py";
+   final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+   final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+   public PythonStreamBinderTest() {
+   }
+
+   public static void main(String[] args) throws Exception {
+   if (args.length == 0) {
+   args = prepareDefaultArgs();
+   } else {
+   args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+   }
+   PythonStreamBinder.main(args);
+   }
+
+   @Override
+   public void testProgram() throws Exception {
+   this.main(new String[]{});
+   }
+
+   private static String[] prepareDefaultArgs() throws Exception {
+   File testFullPath = findStreamTestFile(defaultPythonScriptName);
--- End diff --

I agree that the function names here are a bit confusing - in essence this 
function locates a single test file, while the function in the next code line 
`getFilesInFolder` collects files that start with `test_`, thus the main test 
file `run_all_tests.py` will be filtered and not be included. 
So, in order to be more readable and robust, I changed the 
`getFilesInFolder` to receive one more argument of `excludes` and call it with 
the main test file in the `excludes` argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124914#comment-16124914
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132841381
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,104 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_${scala.binary.version}
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-runtime_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.python
+   jython-standalone
+   2.7.0
+   
+   
+   org.apache.flink
+   flink-connector-kafka-0.9_2.10
--- End diff --

Done for the Scala version. 

As for not being able to run the tests:
1. I fixed the issue with the ```TypeError: object of type 
'java.lang.Class' has no len()```.
2. I still can't reproduce the main issue, concerning an import of java 
class from the Python module:
File: 
```flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py```
Line:19: ```from org.apache.flink.api.java.utils import ParameterTool```

The given class (ParameterTool) resides in different project `flink-java` 
and the jython module cannot find it. Probably, It somehow concerns the 
CLASSPATH. 

Any suggestion for how to reproduce it?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-08-13 Thread zohar-mizrahi
Github user zohar-mizrahi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3838#discussion_r132841381
  
--- Diff: flink-libraries/flink-streaming-python/pom.xml ---
@@ -0,0 +1,104 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.4-SNAPSHOT
+   ..
+   
+
+   flink-streaming-python_${scala.binary.version}
+   flink-streaming-python
+   jar
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   
+   
+   
jar-with-dependencies
+   
+   
+   
+   
true
+   
org.apache.flink.streaming.python.api.PythonStreamBinder
+   
+   
+   
+   
+   
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   flink-java
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.apache.flink
+   
flink-runtime_${scala.binary.version}
+   ${project.version}
+provided
+   
+   
+   org.python
+   jython-standalone
+   2.7.0
+   
+   
+   org.apache.flink
+   flink-connector-kafka-0.9_2.10
--- End diff --

Done for the Scala version. 

As for not being able to run the tests:
1. I fixed the issue with the ```TypeError: object of type 
'java.lang.Class' has no len()```.
2. I still can't reproduce the main issue, concerning an import of java 
class from the Python module:
File: 
```flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py```
Line:19: ```from org.apache.flink.api.java.utils import ParameterTool```

The given class (ParameterTool) resides in different project `flink-java` 
and the jython module cannot find it. Probably, It somehow concerns the 
CLASSPATH. 

Any suggestion for how to reproduce it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124908#comment-16124908
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132834552
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
+s"Found more than one rowtime field: 
[${rowtimeFields.map(_.getName).mkString(", ")}] in " +
+  s"the table that should be converted to a DataStream.\n" +
+  s"Please select the rowtime field that should be used as 
event-time timestamp for the " +
+  s"DataStream by casting all other fields to TIMESTAMP.")
+} else if (rowtimeFields.size == 1) {
+  val origRowType = plan.getType.asInstanceOf[CRowTypeInfo].rowType
+  val convFieldTypes = origRowType.getFieldTypes.map { t =>
+if (FlinkTypeFactory.isRowtimeIndicatorType(t)) {
--- End diff --

.


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124909#comment-16124909
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132839934
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala
 ---
@@ -16,32 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime
+package org.apache.flink.table.runtime.conversion
 
 import java.lang.{Boolean => JBool}
 
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.slf4j.{Logger, LoggerFactory}
 
 /**
-  * Convert [[CRow]] to a [[JTuple2]]
+  * Convert [[CRow]] to a [[JTuple2]].
   */
-class CRowInputJavaTupleOutputMapRunner(
+class CRowToJavaTupleMapRunner(
 name: String,
 code: String,
 @transient var returnType: TypeInformation[JTuple2[JBool, Any]])
   extends RichMapFunction[CRow, Any]
   with ResultTypeQueryable[JTuple2[JBool, Any]]
   with Compiler[MapFunction[Row, Any]] {
--- End diff --

indent


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124911#comment-16124911
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132841043
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

I'm fine with both current approach and use time indicator of left table as 
default. But I think no exception should be thrown when writing a Table to a 
TableSink. But currently, they share the same exception code path.


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124910#comment-16124910
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132840719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   throw new TableException("Field name can not be '*'.")
 }
 
-(fieldNames.toArray, fieldIndexes.toArray)
+(fieldNames.toArray, fieldIndexes.toArray) // build fails if not 
converted to array
--- End diff --

It builds successfully when I remove the `toArray` in my local environment. 


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132840719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   throw new TableException("Field name can not be '*'.")
 }
 
-(fieldNames.toArray, fieldIndexes.toArray)
+(fieldNames.toArray, fieldIndexes.toArray) // build fails if not 
converted to array
--- End diff --

It builds successfully when I remove the `toArray` in my local environment. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132841043
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

I'm fine with both current approach and use time indicator of left table as 
default. But I think no exception should be thrown when writing a Table to a 
TableSink. But currently, they share the same exception code path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132834552
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
+s"Found more than one rowtime field: 
[${rowtimeFields.map(_.getName).mkString(", ")}] in " +
+  s"the table that should be converted to a DataStream.\n" +
+  s"Please select the rowtime field that should be used as 
event-time timestamp for the " +
+  s"DataStream by casting all other fields to TIMESTAMP.")
+} else if (rowtimeFields.size == 1) {
+  val origRowType = plan.getType.asInstanceOf[CRowTypeInfo].rowType
+  val convFieldTypes = origRowType.getFieldTypes.map { t =>
+if (FlinkTypeFactory.isRowtimeIndicatorType(t)) {
--- End diff --

.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132839934
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala
 ---
@@ -16,32 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime
+package org.apache.flink.table.runtime.conversion
 
 import java.lang.{Boolean => JBool}
 
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.slf4j.{Logger, LoggerFactory}
 
 /**
-  * Convert [[CRow]] to a [[JTuple2]]
+  * Convert [[CRow]] to a [[JTuple2]].
   */
-class CRowInputJavaTupleOutputMapRunner(
+class CRowToJavaTupleMapRunner(
 name: String,
 code: String,
 @transient var returnType: TypeInformation[JTuple2[JBool, Any]])
   extends RichMapFunction[CRow, Any]
   with ResultTypeQueryable[JTuple2[JBool, Any]]
   with Compiler[MapFunction[Row, Any]] {
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-08-13 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-7435:
-

Assignee: Kostas Kloudas

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-08-13 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124875#comment-16124875
 ] 

Kostas Kloudas commented on FLINK-7435:
---

The problem here seems to be on the {{copy()}} method of the {{NFA}} which 
currently returns {{this}} and not an actual copy.

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124831#comment-16124831
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132833668
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] 
timestamp.
+  */
+class OutputRowtimeProcessFunction[OUT](
+function: MapFunction[CRow, OUT],
+rowtimeIdx: Int)
--- End diff --

It seems that this function only changes the data type of the rowtime field 
from Long to Timestamp. Shall we consider making the `rowtimeIdx` an array? 
Besides, as @wuchong suggested, I also think a query should keep the data type 
unchanged.


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132833668
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] 
timestamp.
+  */
+class OutputRowtimeProcessFunction[OUT](
+function: MapFunction[CRow, OUT],
+rowtimeIdx: Int)
--- End diff --

It seems that this function only changes the data type of the rowtime field 
from Long to Timestamp. Shall we consider making the `rowtimeIdx` an array? 
Besides, as @wuchong suggested, I also think a query should keep the data type 
unchanged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---