[GitHub] flink pull request #5677: [hotfix] update doc of InternalTimerService.regist...

2018-03-09 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5677

[hotfix] update doc of InternalTimerService.registerEventTimeTimer()

## What is the purpose of the change

update doc of InternalTimerService.registerEventTimeTimer()

## Brief change log

update doc of InternalTimerService.registerEventTimeTimer()

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5677.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5677


commit 410601674a532f01268acd37c9c043b39d9ae6b1
Author: Bowen Li 
Date:   2018-03-10T07:35:15Z

[hotfix] update doc of InternalTimerService.registerEventTimeTimer()




---


[jira] [Comment Edited] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-03-09 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394069#comment-16394069
 ] 

Hequn Cheng edited comment on FLINK-8690 at 3/10/18 6:53 AM:
-

Rename current {{FlinkLogicalAggregateConverter}} to 
{{FlinkLogicalAggregateDataSetConverter}} will bring no change for dataset. 
Calcite can only find a plan generated by AggregateExpandDistinctAggregatesRule.


was (Author: hequn8128):
Rename current FlinkLogicalAggregateConverter to 
FlinkLogicalAggregateDataSetConverter will bring no change for dataset. Calcite 
can only find a plan generated by AggregateExpandDistinctAggregatesRule.

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-03-09 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394069#comment-16394069
 ] 

Hequn Cheng commented on FLINK-8690:


Rename current FlinkLogicalAggregateConverter to 
FlinkLogicalAggregateDataSetConverter will bring no change for dataset. Calcite 
can only find a plan generated by AggregateExpandDistinctAggregatesRule.

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-03-09 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394069#comment-16394069
 ] 

Hequn Cheng edited comment on FLINK-8690 at 3/10/18 6:53 AM:
-

Rename current {{FlinkLogicalAggregateConverter}} to 
{{FlinkLogicalAggregateDataSetConverter}} will bring no change for dataset. 
Calcite can only find a plan generated by 
{{AggregateExpandDistinctAggregatesRule}}.


was (Author: hequn8128):
Rename current {{FlinkLogicalAggregateConverter}} to 
{{FlinkLogicalAggregateDataSetConverter}} will bring no change for dataset. 
Calcite can only find a plan generated by AggregateExpandDistinctAggregatesRule.

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-03-09 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394068#comment-16394068
 ] 

Rong Rong commented on FLINK-8690:
--

You are right. Haven't thought about that. I will verify the plan cost. However 
I am less worry about the DataStream side as the Calcite rule will generate 2 
operations that requires state backend while using MapView will generate only 
1. My worry is on DataSet, as per my initial implementation of splitting 
FlinkLogicalAggregate into 2, both of them ignores the plan generated by 
{{AggregateExpandDistinctAggregatesRule}}

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-03-09 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394064#comment-16394064
 ] 

Hequn Cheng commented on FLINK-8690:


No need to worry about the {{AggregateExpandDistinctAggregatesRule}}, the cost 
of plan generated by it will be bigger than the cost of the plan generated by 
{{FlinkLogicalAggregateDataStreamConverter}} and calcite will choose the best 
plan(i.e., the plan with the smallest cost).

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-03-09 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394046#comment-16394046
 ] 

Rong Rong edited comment on FLINK-8690 at 3/10/18 5:54 AM:
---

That should resolve our problem partially. The real reason why I introduced 
another node before logical plan is because 
**AggregateExpandDistinctAggregatesRule** is actually calcite specific and will 
apply globally regardless of whether it is on DataSet or DataStream and that's 
the rule we want to avoid applying in DataStream API.

Basically it converts 

{code:java}
COUNT (DISTINCT f1)
{code}

Into 

{code:java}
COUNT (DIST_f1)
FROM (SELECT f1 AS DIST_f1 GROUP BY f1)
{code}

There's always a way to unite these two operators together using 
FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then 
unite them together. 

Another possibility is to introduce logical plan RuleSet based on whether it's 
on stream or batch. But that seems to disagree with the purpose of "logical" 
plan


was (Author: walterddr):
That should resolve our problem partially. The real reason why I introduced 
another node before logical plan is because 
**AggregateExpandDistinctAggregatesRule** is actually calcite specific and will 
apply globally regardless of whether it is on DataSet or DataStream and that's 
the rule we want to avoid applying in DataStream API.

Basically it converts 

{code:java}
COUNT (DISTINCT f1)
{code}

Into 

{code:java}
COUNT (DIST_f1)
FROM (SELECT f1 AS DIST_f1 GROUP BY f1)
{code}

There's always a way to unite these two operators together using 
FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then 
unite them together. 

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-03-09 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394046#comment-16394046
 ] 

Rong Rong edited comment on FLINK-8690 at 3/10/18 5:52 AM:
---

That should resolve our problem partially. The real reason why I introduced 
another node before logical plan is because 
**AggregateExpandDistinctAggregatesRule** is actually calcite specific and will 
apply globally regardless of whether it is on DataSet or DataStream and that's 
the rule we want to avoid applying in DataStream API.

Basically it converts 

{code:java}
COUNT (DISTINCT f1)
{code}

Into 

{code:java}
COUNT (DIST_f1)
FROM (SELECT f1 AS DIST_f1 GROUP BY f1)
{code}

There's always a way to unite these two operators together using 
FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then 
unite them together. 


was (Author: walterddr):
That should resolve our problem partially. The real reason why I introduced 
another node before logical plan is because 
**AggregateExpandDistinctAggregatesRule** is actually calcite specific and will 
apply globally regardless of whether it is on DataSet or DataStream and that's 
the rule we want to avoid applying in DataStream API.

Basically it converts 

{code:java}
/COUNT (DISTINCT f1)
{code}

Into 

{code:java}
COUNT (DIST_A)
FROM (SELECT A AS DIST_A GROUP BY A)
{code}

There's always a way to unite these two operators together using 
FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then 
unite them together. 

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-03-09 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394046#comment-16394046
 ] 

Rong Rong commented on FLINK-8690:
--

That should resolve our problem partially. The real reason why I introduced 
another node before logical plan is because 
**AggregateExpandDistinctAggregatesRule** is actually calcite specific and will 
apply globally regardless of whether it is on DataSet or DataStream and that's 
the rule we want to avoid applying in DataStream API.

Basically it converts 

{code:java}
/COUNT (DISTINCT f1)
{code}

Into 

{code:java}
COUNT (DIST_A)
FROM (SELECT A AS DIST_A GROUP BY A)
{code}

There's always a way to unite these two operators together using 
FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then 
unite them together. 

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-03-09 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394042#comment-16394042
 ] 

Rong Rong commented on FLINK-8863:
--

Sounds good. We can deal with that in future. Thanks for the quick reply 

> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue.
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to:
> {code}
> functions:
>   - name: testFunction
> from: class   <-- optional, default: class
> class: org.my.MyScalarFunction
> constructor:  <-- optional, needed for 
> certain types of functions
>   - 42.0
>   - class: org.my.Class  <-- possibility to create 
> objects via properties
> constructor: 
>   - 1
>   - true
>   - false
>   - "whatever"
>   - type: INT
> value: 1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394012#comment-16394012
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

Github user Bekreth commented on the issue:

https://github.com/apache/flink/pull/5538
  
Previously, I was trying to leverage the current annotation parsers. 

It would be possible to pass in a keyspace more directly into 
AnotationParser in the shaded Datastax component, but I'm against this as it 
requires maintaining a version Datastax that differs from the open source 
version.

It could be possible to just replace the entire annotation with reflections 
instead of just editing 1 property, but that still requires some futzing in 
reflections that I think would be best to avoid.

Another option could be to extend the necessary Datastax classes within the 
CassandraPojoSink to facilitate the alterations necessary to pass in keyspace 
dynamically.  I'm thinking this is the most favorable option.


> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5538: [FLINK-8655] [DataSink] Added default keyspace to Cassand...

2018-03-09 Thread Bekreth
Github user Bekreth commented on the issue:

https://github.com/apache/flink/pull/5538
  
Previously, I was trying to leverage the current annotation parsers. 

It would be possible to pass in a keyspace more directly into 
AnotationParser in the shaded Datastax component, but I'm against this as it 
requires maintaining a version Datastax that differs from the open source 
version.

It could be possible to just replace the entire annotation with reflections 
instead of just editing 1 property, but that still requires some futzing in 
reflections that I think would be best to avoid.

Another option could be to extend the necessary Datastax classes within the 
CassandraPojoSink to facilitate the alterations necessary to pass in keyspace 
dynamically.  I'm thinking this is the most favorable option.


---


[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-03-09 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394002#comment-16394002
 ] 

Hequn Cheng commented on FLINK-8690:


Hi, we don't have to create a new logical RelNode. As I said in the design doc, 
we can rename current FlinkLogicalAggregateConverter to 
FlinkLogicalAggregateDataSetConverter and add another 
FlinkLogicalAggregateDataStreamConverter which supports distinct aggregates. 
Both FlinkLogicalAggregateDataSetConverter and 
FlinkLogicalAggregateDataStreamConverter convert a LogicalAggregate to a 
FlinkLogicalAggregate. This may be more neat. What do you think?

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-03-09 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393990#comment-16393990
 ] 

Xingcan Cui commented on FLINK-8863:


Yes, I agree that there may be conflicts among different JARs specified with 
the \{{-jar}} parameters. Maybe we can choose a policy with coarse-grained lib 
resolving first and make it fine-grained (e.g., specify different JARs for 
different uses) in the future.

> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue.
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to:
> {code}
> functions:
>   - name: testFunction
> from: class   <-- optional, default: class
> class: org.my.MyScalarFunction
> constructor:  <-- optional, needed for 
> certain types of functions
>   - 42.0
>   - class: org.my.Class  <-- possibility to create 
> objects via properties
> constructor: 
>   - 1
>   - true
>   - false
>   - "whatever"
>   - type: INT
> value: 1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393985#comment-16393985
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5662
  
Thanks for the explanation, @twalthr! I'll update the PR and resolve the 
conflicts caused.


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5662: [FLINK-8854] [table] Fix schema mapping with time attribu...

2018-03-09 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5662
  
Thanks for the explanation, @twalthr! I'll update the PR and resolve the 
conflicts caused.


---


[jira] [Commented] (FLINK-8895) Job failed when one kafka broker shutdown

2018-03-09 Thread godfrey johnson (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393973#comment-16393973
 ] 

godfrey johnson commented on FLINK-8895:


[~StephanEwen] After the partition leader is changed, the consumer can get all 
the recorders by connecting to the other brokers. So, it is possible to avoid 
the flink job failure by filtering the failed broker, right?

> Job failed when one kafka broker shutdown
> -
>
> Key: FLINK-8895
> URL: https://issues.apache.org/jira/browse/FLINK-8895
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.1
>Reporter: godfrey johnson
>Priority: Major
>
> I used a  FlinkKafkaConsumer08 to get records from kafka,but job failed when 
> a broker shutdown.
>  
> I want to know it is possible to filter the failed broker and get the records 
> with the others brokers?which need to modify Flink's source code.
>  
> And I get the following error:
> {code:java}
> // code placeholder
> java.net.SocketTimeoutException at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) at 
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at 
> kafka.utils.Utils$.read(Utils.scala:380) at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>  at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) at 
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at 
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at 
> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) at 
> org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:220)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-03-09 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393966#comment-16393966
 ] 

Rong Rong commented on FLINK-8863:
--

Thanks [~xccui] for the quick reply. I am assuming users would have to provide 
the JAR files when launch the SQL client. 

one question I have is if there are multiple version of the same class was 
found, or multiple function signature list were found in several different 
JARs. This can clearly be avoided if we only limit one UDF JAR file to search 
functions from.

Another question is related to our use case in FLINK-7373, where we have 
dynamic UDF / JAR file declaration in SQL itself, so I was wondering if there 
can be a optional JAR file field to specify which JAR file function should be 
loading from. But, this use case could also be categorized as "Functions that 
are implemented in SQL" as per [~twalthr]'s description of the JIRA. What do 
you think?

> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue.
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to:
> {code}
> functions:
>   - name: testFunction
> from: class   <-- optional, default: class
> class: org.my.MyScalarFunction
> constructor:  <-- optional, needed for 
> certain types of functions
>   - 42.0
>   - class: org.my.Class  <-- possibility to create 
> objects via properties
> constructor: 
>   - 1
>   - true
>   - false
>   - "whatever"
>   - type: INT
> value: 1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2018-03-09 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351986#comment-16351986
 ] 

Ted Yu edited comment on FLINK-5486 at 3/10/18 1:57 AM:


Can this get more review, please ?


was (Author: yuzhih...@gmail.com):
Can this get more review, please?

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6924) ADD LOG(X) supported in TableAPI

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393960#comment-16393960
 ] 

ASF GitHub Bot commented on FLINK-6924:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5638#discussion_r173607312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1130,4 +1130,13 @@ object concat_ws {
   }
 }
 
+object log {
+  def apply(base: Expression, antilogarithm: Expression): Expression = {
+Log(base, antilogarithm)
+  }
+  def apply(antilogarithm: Expression): Expression = {
+new Log(antilogarithm)
--- End diff --

nvm, I think it might be too weird to write something like 
`base.log(antilogarithm)` in table API.


> ADD LOG(X) supported in TableAPI
> 
>
> Key: FLINK-6924
> URL: https://issues.apache.org/jira/browse/FLINK-6924
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: zjuwangg
>Priority: Major
>  Labels: starter
>
> See FLINK-6891 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...

2018-03-09 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5638#discussion_r173607312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1130,4 +1130,13 @@ object concat_ws {
   }
 }
 
+object log {
+  def apply(base: Expression, antilogarithm: Expression): Expression = {
+Log(base, antilogarithm)
+  }
+  def apply(antilogarithm: Expression): Expression = {
+new Log(antilogarithm)
--- End diff --

nvm, I think it might be too weird to write something like 
`base.log(antilogarithm)` in table API.


---


[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-09 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393956#comment-16393956
 ] 

yanxiaobin edited comment on FLINK-8794 at 3/10/18 1:53 AM:


About : 1.What I described above is that there will be such a situation when 
there is no failure in this job.
 
I think I've found the problem.

I found through log that filesystem's rename method has been executed without 
any exception, but the filename hasn't changed, so I think it should be S3's 
problem. This should not be a problem with Flink.


was (Author: backlight):
About : 1.What I described above is that there will be such a situation when 
there is no failure in this job.

I found through log that filesystem's rename method has been executed without 
any exception, but the filename hasn't changed, so I think it should be S3's 
problem. This should not be a problem with Flink.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-09 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393956#comment-16393956
 ] 

yanxiaobin commented on FLINK-8794:
---

About : 1.What I described above is that there will be such a situation when 
there is no failure in this job.

I found through log that filesystem's rename method has been executed without 
any exception, but the filename hasn't changed, so I think it should be S3's 
problem. This should not be a problem with Flink.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-03-09 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393957#comment-16393957
 ] 

Xingcan Cui commented on FLINK-8863:


Hi [~walterddr], commonly, all the required classes are supposed to be found in 
the provided JAR files, including the UDFs. Do you have some other ideas for 
that?

> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue.
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to:
> {code}
> functions:
>   - name: testFunction
> from: class   <-- optional, default: class
> class: org.my.MyScalarFunction
> constructor:  <-- optional, needed for 
> certain types of functions
>   - 42.0
>   - class: org.my.Class  <-- possibility to create 
> objects via properties
> constructor: 
>   - 1
>   - true
>   - false
>   - "whatever"
>   - type: INT
> value: 1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-09 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8794:
--
Issue Type: Improvement  (was: Bug)

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6924) ADD LOG(X) supported in TableAPI

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393949#comment-16393949
 ] 

ASF GitHub Bot commented on FLINK-6924:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5638#discussion_r173606447
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1130,4 +1130,13 @@ object concat_ws {
   }
 }
 
+object log {
+  def apply(base: Expression, antilogarithm: Expression): Expression = {
+Log(base, antilogarithm)
+  }
+  def apply(antilogarithm: Expression): Expression = {
+new Log(antilogarithm)
--- End diff --

Is it just `new Ln(antilogarithm)`, we might be able to restructure them 
together?


> ADD LOG(X) supported in TableAPI
> 
>
> Key: FLINK-6924
> URL: https://issues.apache.org/jira/browse/FLINK-6924
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: zjuwangg
>Priority: Major
>  Labels: starter
>
> See FLINK-6891 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...

2018-03-09 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5638#discussion_r173606447
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1130,4 +1130,13 @@ object concat_ws {
   }
 }
 
+object log {
+  def apply(base: Expression, antilogarithm: Expression): Expression = {
+Log(base, antilogarithm)
+  }
+  def apply(antilogarithm: Expression): Expression = {
+new Log(antilogarithm)
--- End diff --

Is it just `new Ln(antilogarithm)`, we might be able to restructure them 
together?


---


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-03-09 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393942#comment-16393942
 ] 

Rong Rong commented on FLINK-8863:
--

Hi [~twalthr], in the task description, there's no specification regarding 
where the UDF is loaded from; According to FLIP-24, seems like there's only 
MyToolBox.jar is described as containing UDF. Are we going to always assume 
UDFs are contained in a pre-defined JAR file? 

> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue.
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to:
> {code}
> functions:
>   - name: testFunction
> from: class   <-- optional, default: class
> class: org.my.MyScalarFunction
> constructor:  <-- optional, needed for 
> certain types of functions
>   - 42.0
>   - class: org.my.Class  <-- possibility to create 
> objects via properties
> constructor: 
>   - 1
>   - true
>   - false
>   - "whatever"
>   - type: INT
> value: 1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393756#comment-16393756
 ] 

ASF GitHub Bot commented on FLINK-6206:
---

Github user casidiablo commented on the issue:

https://github.com/apache/flink/pull/5399
  
Just to be clear, we would revert all the changes, except the ones for 
`flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java`, 
right?


> Log task state transitions as warn/error for FAILURE scenarios
> --
>
> Key: FLINK-6206
> URL: https://issues.apache.org/jira/browse/FLINK-6206
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Dan Bress
>Priority: Critical
>
> If a task fails due to an exception, I would like that to be logged at a warn 
> or an error level.  currently its info
> {code}
> private boolean transitionState(ExecutionState currentState, ExecutionState 
> newState, Throwable cause) {
>   if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
>   if (cause == null) {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState);
>   } else {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState, cause);
>   }
>   return true;
>   } else {
>   return false;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...

2018-03-09 Thread casidiablo
Github user casidiablo commented on the issue:

https://github.com/apache/flink/pull/5399
  
Just to be clear, we would revert all the changes, except the ones for 
`flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java`, 
right?


---


[jira] [Commented] (FLINK-8888) Upgrade AWS SDK in flink-connector-kinesis

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393725#comment-16393725
 ] 

ASF GitHub Bot commented on FLINK-:
---

Github user kailashhd commented on the issue:

https://github.com/apache/flink/pull/5663
  
Currently in flink connector we are depending only on aws-sdk-kinesis and 
not on aws-java-sdk-bundle and also don't depend on kinesisvideo. So by default 
the dependency on kinesisvideo is not included in the connector which means we 
don't have to exclude any dependencies. I also verified that there is no 
unwanted netty dependencies by running mvn dependency:tree. The only instance 
of netty is this: `[INFO] |  +- 
org.apache.flink:flink-shaded-netty:jar:4.0.27.Final-2.0:provided` in 
accordance to the value in flink-parent pom.


> Upgrade AWS SDK in flink-connector-kinesis
> --
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Bump up the java aws sdk version to 1.11.272. Evaluate also the impact of 
> this version upgrade for KCL and KPL versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5663: [FLINK-8888] [Kinesis Connectors] Update the AWS SDK for ...

2018-03-09 Thread kailashhd
Github user kailashhd commented on the issue:

https://github.com/apache/flink/pull/5663
  
Currently in flink connector we are depending only on aws-sdk-kinesis and 
not on aws-java-sdk-bundle and also don't depend on kinesisvideo. So by default 
the dependency on kinesisvideo is not included in the connector which means we 
don't have to exclude any dependencies. I also verified that there is no 
unwanted netty dependencies by running mvn dependency:tree. The only instance 
of netty is this: `[INFO] |  +- 
org.apache.flink:flink-shaded-netty:jar:4.0.27.Final-2.0:provided` in 
accordance to the value in flink-parent pom.


---


[jira] [Updated] (FLINK-8913) RocksDB state backend crashes in alpine image

2018-03-09 Thread Joshua Griffith (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshua Griffith updated FLINK-8913:
---
Summary: RocksDB state backend crashes in alpine image  (was: RocksDB state 
backend crashes in hadoop28-scala_2.11-alpine image)

> RocksDB state backend crashes in alpine image
> -
>
> Key: FLINK-8913
> URL: https://issues.apache.org/jira/browse/FLINK-8913
> Project: Flink
>  Issue Type: Bug
>  Components: Docker, State Backends, Checkpointing
>Affects Versions: 1.4.1
> Environment: {{~> minikube version}}
> {{minikube version: v0.25.0}}{{}}{{~> minikube config view}}
> {{- cpus: 4}}
> {{- kubernetes-version: v1.8.0}}
> {{- memory: 8192}}
> {{- vm-driver: hyperkit}}
> {{- WantReportError: true}}
>Reporter: Joshua Griffith
>Priority: Major
>
> Running the word count example jar with the RocksDB state backend enabled 
> (via job manager configuration) with the Flink hadoop28-scala_2.11-alpine 
> image crashes with the following error:
>  
> {{2018-03-09 21:09:12,928 INFO}}{{2018-03-09 21:09:12,892 INFO 
> org.apache.flink.runtime.taskmanager.Task - Source: Collection Source -> Flat 
> Map (1/1) (38365cd6326de6df92b72d1acbda0f72) switched from RUNNING to 
> FINISHED.}}
> {{2018-03-09 21:09:12,892 INFO org.apache.flink.runtime.taskmanager.Task - 
> Freeing task resources for Source: Collection Source -> Flat Map (1/1) 
> (38365cd6326de6df92b72d1acbda0f72).}}
> {{2018-03-09 21:09:12,894 INFO org.apache.flink.runtime.taskmanager.Task - 
> Ensuring all FileSystem streams are closed for task Source: Collection Source 
> -> Flat Map (1/1) (38365cd6326de6df92b72d1acbda0f72) [FINISHED]}}
> {{2018-03-09 21:09:12,897 INFO 
> org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and 
> sending final execution state FINISHED to JobManager for task Source: 
> Collection Source -> Flat Map (38365cd6326de6df92b72d1acbda0f72)}}
> {{2018-03-09 21:09:12,902 INFO 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Successfully 
> loaded RocksDB native 
> libraryorg.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 
> Initializing RocksDB keyed state backend from snapshot.}}
> {{#}}
> {{# A fatal error has been detected by the Java Runtime Environment:}}
> {{#}}
> {{# SIGSEGV (0xb) at pc=0x001432b6, pid=1, tid=0x7fc4036e1ae8}}
> {{#}}
> {{# JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build 
> 1.8.0_151-b12)}}
> {{# Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 
> compressed oops)}}
> {{# Derivative: IcedTea 3.6.0}}
> {{# Distribution: Custom build (Tue Nov 21 11:22:36 GMT 2017)}}
> {{# Problematic frame:}}
> {{# C 0x001432b6}}
> {{#}}
> {{# Core dump written. Default location: /opt/flink/core or core.1}}
> {{#}}
> {{# An error report file with more information is saved as:}}
> {{# /opt/flink/hs_err_pid1.log}}
> {{#}}
> {{# If you would like to submit a bug report, please include}}
> {{# instructions on how to reproduce the bug and visit:}}
> {{# http://icedtea.classpath.org/bugzilla}}
>  
> Switching to the Debian image fixes this issue. I imagine it has to do with 
> alpine's alternative libc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8913) RocksDB state backend crashes in hadoop28-scala_2.11-alpine image

2018-03-09 Thread Joshua Griffith (JIRA)
Joshua Griffith created FLINK-8913:
--

 Summary: RocksDB state backend crashes in 
hadoop28-scala_2.11-alpine image
 Key: FLINK-8913
 URL: https://issues.apache.org/jira/browse/FLINK-8913
 Project: Flink
  Issue Type: Bug
  Components: Docker, State Backends, Checkpointing
Affects Versions: 1.4.1
 Environment: {{~> minikube version}}
{{minikube version: v0.25.0}}{{}}{{~> minikube config view}}
{{- cpus: 4}}
{{- kubernetes-version: v1.8.0}}
{{- memory: 8192}}
{{- vm-driver: hyperkit}}
{{- WantReportError: true}}
Reporter: Joshua Griffith


Running the word count example jar with the RocksDB state backend enabled (via 
job manager configuration) with the Flink hadoop28-scala_2.11-alpine image 
crashes with the following error:

 

{{2018-03-09 21:09:12,928 INFO}}{{2018-03-09 21:09:12,892 INFO 
org.apache.flink.runtime.taskmanager.Task - Source: Collection Source -> Flat 
Map (1/1) (38365cd6326de6df92b72d1acbda0f72) switched from RUNNING to 
FINISHED.}}
{{2018-03-09 21:09:12,892 INFO org.apache.flink.runtime.taskmanager.Task - 
Freeing task resources for Source: Collection Source -> Flat Map (1/1) 
(38365cd6326de6df92b72d1acbda0f72).}}
{{2018-03-09 21:09:12,894 INFO org.apache.flink.runtime.taskmanager.Task - 
Ensuring all FileSystem streams are closed for task Source: Collection Source 
-> Flat Map (1/1) (38365cd6326de6df92b72d1acbda0f72) [FINISHED]}}
{{2018-03-09 21:09:12,897 INFO org.apache.flink.runtime.taskmanager.TaskManager 
- Un-registering task and sending final execution state FINISHED to JobManager 
for task Source: Collection Source -> Flat Map 
(38365cd6326de6df92b72d1acbda0f72)}}
{{2018-03-09 21:09:12,902 INFO 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Successfully 
loaded RocksDB native 
libraryorg.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 
Initializing RocksDB keyed state backend from snapshot.}}
{{#}}
{{# A fatal error has been detected by the Java Runtime Environment:}}
{{#}}
{{# SIGSEGV (0xb) at pc=0x001432b6, pid=1, tid=0x7fc4036e1ae8}}
{{#}}
{{# JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build 
1.8.0_151-b12)}}
{{# Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 
compressed oops)}}
{{# Derivative: IcedTea 3.6.0}}
{{# Distribution: Custom build (Tue Nov 21 11:22:36 GMT 2017)}}
{{# Problematic frame:}}
{{# C 0x001432b6}}
{{#}}
{{# Core dump written. Default location: /opt/flink/core or core.1}}
{{#}}
{{# An error report file with more information is saved as:}}
{{# /opt/flink/hs_err_pid1.log}}
{{#}}
{{# If you would like to submit a bug report, please include}}
{{# instructions on how to reproduce the bug and visit:}}
{{# http://icedtea.classpath.org/bugzilla}}

 

Switching to the Debian image fixes this issue. I imagine it has to do with 
alpine's alternative libc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393392#comment-16393392
 ] 

ASF GitHub Bot commented on FLINK-6206:
---

Github user casidiablo commented on the issue:

https://github.com/apache/flink/pull/5399
  
Sure, that makes sense!


> Log task state transitions as warn/error for FAILURE scenarios
> --
>
> Key: FLINK-6206
> URL: https://issues.apache.org/jira/browse/FLINK-6206
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Dan Bress
>Priority: Critical
>
> If a task fails due to an exception, I would like that to be logged at a warn 
> or an error level.  currently its info
> {code}
> private boolean transitionState(ExecutionState currentState, ExecutionState 
> newState, Throwable cause) {
>   if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
>   if (cause == null) {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState);
>   } else {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState, cause);
>   }
>   return true;
>   } else {
>   return false;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...

2018-03-09 Thread casidiablo
Github user casidiablo commented on the issue:

https://github.com/apache/flink/pull/5399
  
Sure, that makes sense!


---


[jira] [Closed] (FLINK-8786) SpillableSubpartitionView#getNextBuffer always sets isMoreAvailable to false when switching from spillable to spilled

2018-03-09 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8786.
---

> SpillableSubpartitionView#getNextBuffer always sets isMoreAvailable to false 
> when switching from spillable to spilled
> -
>
> Key: FLINK-8786
> URL: https://issues.apache.org/jira/browse/FLINK-8786
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> When processing the last in-memory buffer in 
> {{SpillableSubpartitionView#getNextBuffer}}, we always set the 
> {{isMoreAvailable}} flag of the returned {{BufferAndBacklog}} to {{false}} 
> irrespective of what may be in the spill writer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8786) SpillableSubpartitionView#getNextBuffer always sets isMoreAvailable to false when switching from spillable to spilled

2018-03-09 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8786.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in
  - 1.5.0 via 835adcc373ce169f202055e9b4f9dc3fb9123772 and 
d1a969f7ad018ef44f40f974eb49ba004494fcdf
  - 1.6.0 via 112c54fb07e2a3f33322ba99a9d59c1a8dbc and 
c19df9ff670c06aeb381339c244bbd22fe13fd4d

> SpillableSubpartitionView#getNextBuffer always sets isMoreAvailable to false 
> when switching from spillable to spilled
> -
>
> Key: FLINK-8786
> URL: https://issues.apache.org/jira/browse/FLINK-8786
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> When processing the last in-memory buffer in 
> {{SpillableSubpartitionView#getNextBuffer}}, we always set the 
> {{isMoreAvailable}} flag of the returned {{BufferAndBacklog}} to {{false}} 
> irrespective of what may be in the spill writer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393374#comment-16393374
 ] 

ASF GitHub Bot commented on FLINK-8755:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5581


> SpilledSubpartitionView wrongly relys on the backlog for determining whether 
> more data is available
> ---
>
> Key: FLINK-8755
> URL: https://issues.apache.org/jira/browse/FLINK-8755
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> {code}
> public BufferAndBacklog getNextBuffer() throws IOException, 
> InterruptedException {
> //...
> int newBacklog = parent.decreaseBuffersInBacklog(current);
> return new BufferAndBacklog(current, newBacklog > 0, newBacklog, 
> nextBufferIsEvent);
> {code}
> relies on the backlog to signal further data availability. However, if there 
> are only events left in the buffer queue, their buffers are not included in 
> the backlog counting and therefore, {{isMoreAvailable}} will be wrongly 
> {{false}} here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available

2018-03-09 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8755.
---

> SpilledSubpartitionView wrongly relys on the backlog for determining whether 
> more data is available
> ---
>
> Key: FLINK-8755
> URL: https://issues.apache.org/jira/browse/FLINK-8755
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> {code}
> public BufferAndBacklog getNextBuffer() throws IOException, 
> InterruptedException {
> //...
> int newBacklog = parent.decreaseBuffersInBacklog(current);
> return new BufferAndBacklog(current, newBacklog > 0, newBacklog, 
> nextBufferIsEvent);
> {code}
> relies on the backlog to signal further data availability. However, if there 
> are only events left in the buffer queue, their buffers are not included in 
> the backlog counting and therefore, {{isMoreAvailable}} will be wrongly 
> {{false}} here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available

2018-03-09 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8755.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in
  - 1.5.0 via 5c7457aa2aece89b77e9a9402cc514f90e083a69 and 
d1a969f7ad018ef44f40f974eb49ba004494fcdf
  - 1.6.0 via 18b75e32bb8f4f155f729574b2d377459104471e and 
c19df9ff670c06aeb381339c244bbd22fe13fd4d

> SpilledSubpartitionView wrongly relys on the backlog for determining whether 
> more data is available
> ---
>
> Key: FLINK-8755
> URL: https://issues.apache.org/jira/browse/FLINK-8755
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> {code}
> public BufferAndBacklog getNextBuffer() throws IOException, 
> InterruptedException {
> //...
> int newBacklog = parent.decreaseBuffersInBacklog(current);
> return new BufferAndBacklog(current, newBacklog > 0, newBacklog, 
> nextBufferIsEvent);
> {code}
> relies on the backlog to signal further data availability. However, if there 
> are only events left in the buffer queue, their buffers are not included in 
> the backlog counting and therefore, {{isMoreAvailable}} will be wrongly 
> {{false}} here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-03-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5581


---


[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393357#comment-16393357
 ] 

ASF GitHub Bot commented on FLINK-6206:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5399
  
Do you think it would be a good approach to promote that to ERROR logging 
on the TaskManager (who actually encountered the exception) and leave the 
JobManager at INFO (who only handles a regular failure/recovery cycle)?


> Log task state transitions as warn/error for FAILURE scenarios
> --
>
> Key: FLINK-6206
> URL: https://issues.apache.org/jira/browse/FLINK-6206
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Dan Bress
>Priority: Critical
>
> If a task fails due to an exception, I would like that to be logged at a warn 
> or an error level.  currently its info
> {code}
> private boolean transitionState(ExecutionState currentState, ExecutionState 
> newState, Throwable cause) {
>   if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
>   if (cause == null) {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState);
>   } else {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState, cause);
>   }
>   return true;
>   } else {
>   return false;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5399
  
Do you think it would be a good approach to promote that to ERROR logging 
on the TaskManager (who actually encountered the exception) and leave the 
JobManager at INFO (who only handles a regular failure/recovery cycle)?


---


[jira] [Commented] (FLINK-8888) Upgrade AWS SDK in flink-connector-kinesis

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393327#comment-16393327
 ] 

ASF GitHub Bot commented on FLINK-:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5663
  
Is your testing Flink job both reading from and writing to Kinesis, aka 
both KCL and KPL are tested?

If so, +1


> Upgrade AWS SDK in flink-connector-kinesis
> --
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Bump up the java aws sdk version to 1.11.272. Evaluate also the impact of 
> this version upgrade for KCL and KPL versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5663: [FLINK-8888] [Kinesis Connectors] Update the AWS SDK for ...

2018-03-09 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5663
  
Is your testing Flink job both reading from and writing to Kinesis, aka 
both KCL and KPL are tested?

If so, +1


---


[jira] [Commented] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393314#comment-16393314
 ] 

ASF GitHub Bot commented on FLINK-8364:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5356
  
hmmm I think you are right, this actually might be a non-issue in the 
first place


> Add iterator() to ListState which returns empty iterator when it has no value
> -
>
> Key: FLINK-8364
> URL: https://issues.apache.org/jira/browse/FLINK-8364
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Add iterator() to ListState which returns empty iterator when it has no value



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5356: [FLINK-8364][state backend] Add iterator() to ListState w...

2018-03-09 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5356
  
hmmm I think you are right, this actually might be a non-issue in the 
first place


---


[jira] [Commented] (FLINK-8515) update RocksDBMapState to replace deprecated remove() with delete()

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393312#comment-16393312
 ] 

ASF GitHub Bot commented on FLINK-8515:
---

Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/5365


> update RocksDBMapState to replace deprecated remove() with delete()
> ---
>
> Key: FLINK-8515
> URL: https://issues.apache.org/jira/browse/FLINK-8515
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0
>
>
> Currently in RocksDBMapState:
> {code:java}
> @Override
>   public void remove(UK userKey) throws IOException, RocksDBException {
>   byte[] rawKeyBytes = 
> serializeUserKeyWithCurrentKeyAndNamespace(userKey);
>   backend.db.remove(columnFamily, writeOptions, rawKeyBytes);
>   }
> {code}
> remove() is actually deprecated. Should be replaced with delete()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5365: [FLINK-8515] update RocksDBMapState to replace dep...

2018-03-09 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/5365


---


[jira] [Assigned] (FLINK-8897) Rowtime materialization causes "mismatched type" AssertionError

2018-03-09 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-8897:
---

Assignee: Timo Walther

> Rowtime materialization causes "mismatched type" AssertionError
> ---
>
> Key: FLINK-8897
> URL: https://issues.apache.org/jira/browse/FLINK-8897
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> As raised in [this 
> thread|https://lists.apache.org/thread.html/e2ea38aa7ae224d7481145334955d84243690e9aad10d58310bdb8e7@%3Cuser.flink.apache.org%3E],
>  the query created by the following code will throw a calcite "mismatch type" 
> ({{Timestamp(3)}} and {{TimeIndicator}}) exception.
> {code:java}
> String sql1 = "select id, eventTs as t1, count(*) over (partition by id order 
> by eventTs rows between 100 preceding and current row) as cnt1 from myTable1";
> String sql2 = "select distinct id as r_id, eventTs as t2, count(*) over 
> (partition by id order by eventTs rows between 50 preceding and current row) 
> as cnt2 from myTable2";
> Table left = tableEnv.sqlQuery(sql1);
> Table right = tableEnv.sqlQuery(sql2);
> left.join(right).where("id === r_id && t1 === t2").select("id, 
> t1").writeToSink(...)
> {code}
> The logical plan is as follows.
> {code}
> LogicalProject(id=[$0], t1=[$1])
>   LogicalFilter(condition=[AND(=($0, $3), =($1, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalAggregate(group=[{0, 1, 2}])
> LogicalWindow(window#0=[window(partition {0} order by [1] rows 
> between $2 PRECEDING and CURRENT ROW aggs [COUNT()])])
>   LogicalProject(id=[$0], eventTs=[$3])
> LogicalTableScan(table=[[_DataStreamTable_0]])
>   LogicalAggregate(group=[{0, 1, 2}])
> LogicalWindow(window#0=[window(partition {0} order by [1] rows 
> between $2 PRECEDING and CURRENT ROW aggs [COUNT()])])
>   LogicalProject(id=[$0], eventTs=[$3])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> {code}
> That is because the the rowtime field after an aggregation will be 
> materialized while the {{RexInputRef}} type for the filter's operands ({{t1 
> === t2}}) is still {{TimeIndicator}}. We should make them unified.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8903) Error calculation based on rolling window in table API and SQL API

2018-03-09 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-8903.

Resolution: Not A Problem

Hi [~lilizhao], processing time is inherently non-deterministic, i.e., 
different executions produce different results. You have to use event time if 
you want more accurate results.

Closing this issue as "Not a Problem"

> Error calculation based on rolling window in table API and SQL API
> --
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.1
>Reporter: lilizhao
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: TableAndSQLTest.java
>
>
> Error calculation based on rolling window in table API and SQL API
> The variance of the calculation is equal to the average.
> 1 The test code is detailed in the appendix 
> 2 The test data are as follows
> 1 li
> 100 li
> 3 The Table API test result as follows
> (true,50.5,101.0,50.5,li,2018-03-09 09:11:00.0)
> 4 The SQL API test result as follows
> (true,50.5,2,101.0,50.5,li,2018-03-09 09:21:00.0)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393157#comment-16393157
 ] 

ASF GitHub Bot commented on FLINK-6206:
---

Github user casidiablo commented on the issue:

https://github.com/apache/flink/pull/5399
  
The reason I proposed the change is that any unexpected behavior, even when 
you can recover from it, can be indicative of a potential bug/misconfiguration. 
INFO logs are by far noisier than errors, which means it is tough to identify 
problems unless you log them with ERROR (or at least WARN).

I'd say we at least demote this to WARN. When one is analyzing logs, trying 
to find leads to current problems, it's common to filter out all INFO logs 
(otherwise it's mayhem); so anything that caused an exception to be thrown 
should be highlighted.


> Log task state transitions as warn/error for FAILURE scenarios
> --
>
> Key: FLINK-6206
> URL: https://issues.apache.org/jira/browse/FLINK-6206
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Dan Bress
>Priority: Critical
>
> If a task fails due to an exception, I would like that to be logged at a warn 
> or an error level.  currently its info
> {code}
> private boolean transitionState(ExecutionState currentState, ExecutionState 
> newState, Throwable cause) {
>   if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
>   if (cause == null) {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState);
>   } else {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState, cause);
>   }
>   return true;
>   } else {
>   return false;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...

2018-03-09 Thread casidiablo
Github user casidiablo commented on the issue:

https://github.com/apache/flink/pull/5399
  
The reason I proposed the change is that any unexpected behavior, even when 
you can recover from it, can be indicative of a potential bug/misconfiguration. 
INFO logs are by far noisier than errors, which means it is tough to identify 
problems unless you log them with ERROR (or at least WARN).

I'd say we at least demote this to WARN. When one is analyzing logs, trying 
to find leads to current problems, it's common to filter out all INFO logs 
(otherwise it's mayhem); so anything that caused an exception to be thrown 
should be highlighted.


---


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393140#comment-16393140
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5676
  
CC @aljoscha 


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393139#comment-16393139
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/5676

[FLINK-8910][tests] Automated end-to-end test for local recovery and sticky 
scheduling

This PR adds an automated end-to-end test for the local recovery feature, 
which also includes sticky allocation. We expose allocation id through (not 
public) `StreamingRuntimeContext `.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink automated-test-finish

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5676.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5676


commit 7de3d9d8ddaa84684e8c757285201387ad556ef2
Author: Stefan Richter 
Date:   2018-03-08T18:20:32Z

[FLINK-8910][tests] Expose allocation id through runtime ctx

commit d8e740484a340f20277b9ed1e2d22b9d96897937
Author: Stefan Richter 
Date:   2018-03-06T09:35:44Z

[FLINK-8910][tests] Automated test for local recovery (including sticky 
allocation)




> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5676: [FLINK-8910][tests] Automated end-to-end test for local r...

2018-03-09 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5676
  
CC @aljoscha 


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-09 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/5676

[FLINK-8910][tests] Automated end-to-end test for local recovery and sticky 
scheduling

This PR adds an automated end-to-end test for the local recovery feature, 
which also includes sticky allocation. We expose allocation id through (not 
public) `StreamingRuntimeContext `.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink automated-test-finish

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5676.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5676


commit 7de3d9d8ddaa84684e8c757285201387ad556ef2
Author: Stefan Richter 
Date:   2018-03-08T18:20:32Z

[FLINK-8910][tests] Expose allocation id through runtime ctx

commit d8e740484a340f20277b9ed1e2d22b9d96897937
Author: Stefan Richter 
Date:   2018-03-06T09:35:44Z

[FLINK-8910][tests] Automated test for local recovery (including sticky 
allocation)




---


[jira] [Updated] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8740:

Priority: Critical  (was: Blocker)

> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Priority: Critical
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job Metrics:
> {code:java}
> if (isRecovery) {
>log.info(s"Removing metrics for $jobId, new will be added during recover")
>jobManagerMetricGroup.removeJob(jobId)
> }{code}
> I'd be happy to submit this in a PR if that is acceptable to open up the 
> discussion, but I am not sure the consequences of not closing the previous 
> JMMG or perhaps simply not re-registering job-level metrics during recovery. 
> Doing this would seem to entail informing lower levels about the recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8740:

Priority: Blocker  (was: Critical)

> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job Metrics:
> {code:java}
> if (isRecovery) {
>log.info(s"Removing metrics for $jobId, new will be added during recover")
>jobManagerMetricGroup.removeJob(jobId)
> }{code}
> I'd be happy to submit this in a PR if that is acceptable to open up the 
> discussion, but I am not sure the consequences of not closing the previous 
> JMMG or perhaps simply not re-registering job-level metrics during recovery. 
> Doing this would seem to entail informing lower levels about the recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393051#comment-16393051
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5371
  
Two more questions about this:

  1. There is code in the shell scripts that rotates log file each time you 
start / stop the cluster, with .0 /.1/.2/etc suffixes to the log files. Have 
you tested whether that still works with the changed setup?

  2. I think rolling by time is something that users expect more commonly 
that rolling by size. What do you think here?



> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5371: [FLINK-8357] [conf] Enable rolling in default log setting...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5371
  
Two more questions about this:

  1. There is code in the shell scripts that rotates log file each time you 
start / stop the cluster, with .0 /.1/.2/etc suffixes to the log files. Have 
you tested whether that still works with the changed setup?

  2. I think rolling by time is something that users expect more commonly 
that rolling by size. What do you think here?



---


[jira] [Commented] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393028#comment-16393028
 ] 

ASF GitHub Bot commented on FLINK-8755:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5581
  
Thanks for the patch.

Looks good, well tested and reviewed (thanks, @pnowojski).

Merging this...


> SpilledSubpartitionView wrongly relys on the backlog for determining whether 
> more data is available
> ---
>
> Key: FLINK-8755
> URL: https://issues.apache.org/jira/browse/FLINK-8755
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {code}
> public BufferAndBacklog getNextBuffer() throws IOException, 
> InterruptedException {
> //...
> int newBacklog = parent.decreaseBuffersInBacklog(current);
> return new BufferAndBacklog(current, newBacklog > 0, newBacklog, 
> nextBufferIsEvent);
> {code}
> relies on the backlog to signal further data availability. However, if there 
> are only events left in the buffer queue, their buffers are not included in 
> the backlog counting and therefore, {{isMoreAvailable}} will be wrongly 
> {{false}} here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in spilled...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5581
  
Thanks for the patch.

Looks good, well tested and reviewed (thanks, @pnowojski).

Merging this...


---


[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393002#comment-16393002
 ] 

ASF GitHub Bot commented on FLINK-8599:
---

Github user ChengzhiZhao commented on the issue:

https://github.com/apache/flink/pull/5521
  
@StephanEwen @kl0u Thanks for you feedback, I will put an option for user 
to choose


> Improve the failure behavior of the FileInputFormat for bad files
> -
>
> Key: FLINK-8599
> URL: https://issues.apache.org/jira/browse/FLINK-8599
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Chengzhi Zhao
>Priority: Major
>
> So we have a s3 path that flink is monitoring that path to see new files 
> available.
> {code:java}
> val avroInputStream_activity = env.readFile(format, path, 
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1)  {code}
>  
> I am doing both internal and external check pointing and let's say there is a 
> bad file (for example, a different schema been dropped in this folder) came 
> to the path and flink will do several retries. I want to take those bad files 
> and let the process continue. However, since the file path persist in the 
> checkpoint, when I try to resume from external checkpoint, it threw the 
> following error on no file been found.
>  
> {code:java}
> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No 
> such file or directory: s3a://myfile{code}
>  
> As [~fhue...@gmail.com] suggested, we could check if a path exists and before 
> trying to read a file and ignore the input split instead of throwing an 
> exception and causing a failure.
>  
> Also, I am thinking about add an error output for bad files as an option to 
> users. So if there is any bad files exist we could move them in a separated 
> path and do further analysis. 
>  
> Not sure how people feel about it, but I'd like to contribute on it if people 
> think this can be an improvement. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5521: [FLINK-8599] Improve the failure behavior of the FileInpu...

2018-03-09 Thread ChengzhiZhao
Github user ChengzhiZhao commented on the issue:

https://github.com/apache/flink/pull/5521
  
@StephanEwen @kl0u Thanks for you feedback, I will put an option for user 
to choose


---


[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392999#comment-16392999
 ] 

ASF GitHub Bot commented on FLINK-6206:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5399
  
I am unsure about this change, so let's discuss the pros and cons a bit.

So far, this purposefully logs on *INFO* so far, because from the 
JobManager's perspective, a task failing and recovering if not an erroneous 
situation. It conveys the assumption that failures and recoveries are perfectly 
expected as part of the Job life cycle.

The assumption Something that is logged on "ERROR" is something where a 
user may want to dig into and see whether they should do something about it.

Flink does not follow this perspective perfectly in all parts, but that was 
the reasoning behind the fact to use *info* for these state changes.

Happy to hear other thoughts on this.


> Log task state transitions as warn/error for FAILURE scenarios
> --
>
> Key: FLINK-6206
> URL: https://issues.apache.org/jira/browse/FLINK-6206
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Dan Bress
>Priority: Critical
>
> If a task fails due to an exception, I would like that to be logged at a warn 
> or an error level.  currently its info
> {code}
> private boolean transitionState(ExecutionState currentState, ExecutionState 
> newState, Throwable cause) {
>   if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
>   if (cause == null) {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState);
>   } else {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState, cause);
>   }
>   return true;
>   } else {
>   return false;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5399
  
I am unsure about this change, so let's discuss the pros and cons a bit.

So far, this purposefully logs on *INFO* so far, because from the 
JobManager's perspective, a task failing and recovering if not an erroneous 
situation. It conveys the assumption that failures and recoveries are perfectly 
expected as part of the Job life cycle.

The assumption Something that is logged on "ERROR" is something where a 
user may want to dig into and see whether they should do something about it.

Flink does not follow this perspective perfectly in all parts, but that was 
the reasoning behind the fact to use *info* for these state changes.

Happy to hear other thoughts on this.


---


[jira] [Commented] (FLINK-8515) update RocksDBMapState to replace deprecated remove() with delete()

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392978#comment-16392978
 ] 

ASF GitHub Bot commented on FLINK-8515:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5365
  
This change has already gotten in via another commit.

@bowenli86 can you close this PR?
Thank you for the contribution!


> update RocksDBMapState to replace deprecated remove() with delete()
> ---
>
> Key: FLINK-8515
> URL: https://issues.apache.org/jira/browse/FLINK-8515
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0
>
>
> Currently in RocksDBMapState:
> {code:java}
> @Override
>   public void remove(UK userKey) throws IOException, RocksDBException {
>   byte[] rawKeyBytes = 
> serializeUserKeyWithCurrentKeyAndNamespace(userKey);
>   backend.db.remove(columnFamily, writeOptions, rawKeyBytes);
>   }
> {code}
> remove() is actually deprecated. Should be replaced with delete()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5365: [FLINK-8515] update RocksDBMapState to replace deprecated...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5365
  
This change has already gotten in via another commit.

@bowenli86 can you close this PR?
Thank you for the contribution!


---


[jira] [Closed] (FLINK-8898) Materialize time indicators in conditions of LogicalFilter

2018-03-09 Thread Hequn Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-8898.
--
Resolution: Duplicate

> Materialize time indicators in conditions of LogicalFilter
> --
>
> Key: FLINK-8898
> URL: https://issues.apache.org/jira/browse/FLINK-8898
> Project: Flink
>  Issue Type: Bug
>Reporter: Hequn Cheng
>Priority: Major
>
> Currently, {{RelTimeIndicatorConverter}} do not materialize time indicators 
> in conditions of LogicalFilter which leads to type miss exceptions. We can 
> reproduce the exception by the following test case.
> {code:java}
> @Test
> def reproduceTypeMissmatch(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> env.setStateBackend(getStateBackend)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> StreamITCase.clear
> val data1 = new mutable.MutableList[(Int, Long, Int, Long)]
> data1.+=((1, 1L, 1, 1L))
> data1.+=((1, 2L, 1, 1L))
> val t1 = env.fromCollection(data1)
> .assignTimestampsAndWatermarks(new Row5WatermarkExtractor)
> .toTable(tEnv, 'id, 'ip, 'type, 'eventTs.rowtime)
> tEnv.registerTable("myTable", t1)
> val sql1 = "select distinct id, eventTs as eventTs, count(*) over (partition 
> by id order by eventTs rows" +
> " between 100 preceding and current row) as cnt1 from myTable"
> val sql2 = "select distinct id as r_id, eventTs as r_eventTs, count(*) over 
> (partition by id " +
> "order by eventTs rows between 50 preceding and current row) as cnt2 from 
> myTable"
> val left = tEnv.sqlQuery(sql1)
> val right = tEnv.sqlQuery(sql2)
> left.join(right).where("id = r_id && eventTs === r_eventTs").select('id)
> .writeToSink(new TestRetractSink, queryConfig)
> env.execute()
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8898) Materialize time indicators in conditions of LogicalFilter

2018-03-09 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392975#comment-16392975
 ] 

Hequn Cheng commented on FLINK-8898:


duplicated with [FLINK-8897|https://issues.apache.org/jira/browse/FLINK-8897]

> Materialize time indicators in conditions of LogicalFilter
> --
>
> Key: FLINK-8898
> URL: https://issues.apache.org/jira/browse/FLINK-8898
> Project: Flink
>  Issue Type: Bug
>Reporter: Hequn Cheng
>Priority: Major
>
> Currently, {{RelTimeIndicatorConverter}} do not materialize time indicators 
> in conditions of LogicalFilter which leads to type miss exceptions. We can 
> reproduce the exception by the following test case.
> {code:java}
> @Test
> def reproduceTypeMissmatch(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> env.setStateBackend(getStateBackend)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> StreamITCase.clear
> val data1 = new mutable.MutableList[(Int, Long, Int, Long)]
> data1.+=((1, 1L, 1, 1L))
> data1.+=((1, 2L, 1, 1L))
> val t1 = env.fromCollection(data1)
> .assignTimestampsAndWatermarks(new Row5WatermarkExtractor)
> .toTable(tEnv, 'id, 'ip, 'type, 'eventTs.rowtime)
> tEnv.registerTable("myTable", t1)
> val sql1 = "select distinct id, eventTs as eventTs, count(*) over (partition 
> by id order by eventTs rows" +
> " between 100 preceding and current row) as cnt1 from myTable"
> val sql2 = "select distinct id as r_id, eventTs as r_eventTs, count(*) over 
> (partition by id " +
> "order by eventTs rows between 50 preceding and current row) as cnt2 from 
> myTable"
> val left = tEnv.sqlQuery(sql1)
> val right = tEnv.sqlQuery(sql2)
> left.join(right).where("id = r_id && eventTs === r_eventTs").select('id)
> .writeToSink(new TestRetractSink, queryConfig)
> env.execute()
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5185: [FLINK-8297] [flink-rocksdb] Optionally store elements of...

2018-03-09 Thread je-ik
Github user je-ik commented on the issue:

https://github.com/apache/flink/pull/5185
  
@StephanEwen I think it should be configurable. As Aljoscha pointed out, it 
is needed to ensure that these two representations have the same serialized 
form in checkpoints, because that way users can switch back and forth the 
implementations between application restarts. Unfortunately, I didn't have time 
to dive into that so far. :-(


---


[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392976#comment-16392976
 ] 

ASF GitHub Bot commented on FLINK-8297:
---

Github user je-ik commented on the issue:

https://github.com/apache/flink/pull/5185
  
@StephanEwen I think it should be configurable. As Aljoscha pointed out, it 
is needed to ensure that these two representations have the same serialized 
form in checkpoints, because that way users can switch back and forth the 
implementations between application restarts. Unfortunately, I didn't have time 
to dive into that so far. :-(


> RocksDBListState stores whole list in single byte[]
> ---
>
> Key: FLINK-8297
> URL: https://issues.apache.org/jira/browse/FLINK-8297
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Jan Lukavský
>Priority: Major
>
> RocksDBListState currently keeps whole list of data in single RocksDB 
> key-value pair, which implies that the list actually must fit into memory. 
> Larger lists are not supported and end up with OOME or other error. The 
> RocksDBListState could be modified so that individual items in list are 
> stored in separate keys in RocksDB and can then be iterated over. A simple 
> implementation could reuse existing RocksDBMapState, with key as index to the 
> list and a single RocksDBValueState keeping track of how many items has 
> already been added to the list. Because this implementation might be less 
> efficient in come cases, it would be good to make it opt-in by a construct 
> like
> {{new RocksDBStateBackend().enableLargeListsPerKey()}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6214) WindowAssigners do not allow negative offsets

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392966#comment-16392966
 ] 

ASF GitHub Bot commented on FLINK-6214:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5376
  
Ah, one problem: What happens if the offset is larger (by absolute value) 
than the window length? Then the offset would still be negative with this 
change.


> WindowAssigners do not allow negative offsets
> -
>
> Key: FLINK-6214
> URL: https://issues.apache.org/jira/browse/FLINK-6214
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Timo Walther
>Priority: Major
>
> Both the website and the JavaDoc promotes 
> ".window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) For 
> example, in China you would have to specify an offset of Time.hours(-8)". But 
> both the sliding and tumbling event time assigners do not allow offset to be 
> negative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5376: [FLINK-6214] WindowAssigners do not allow negative offset...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5376
  
Ah, one problem: What happens if the offset is larger (by absolute value) 
than the window length? Then the offset would still be negative with this 
change.


---


[jira] [Commented] (FLINK-6214) WindowAssigners do not allow negative offsets

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392962#comment-16392962
 ] 

ASF GitHub Bot commented on FLINK-6214:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5376
  
If we decide that we want to support negative offsets, this looks like a 
good implementation.

@aljoscha what is your take, should we support negative offsets?


> WindowAssigners do not allow negative offsets
> -
>
> Key: FLINK-6214
> URL: https://issues.apache.org/jira/browse/FLINK-6214
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Timo Walther
>Priority: Major
>
> Both the website and the JavaDoc promotes 
> ".window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) For 
> example, in China you would have to specify an offset of Time.hours(-8)". But 
> both the sliding and tumbling event time assigners do not allow offset to be 
> negative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5376: [FLINK-6214] WindowAssigners do not allow negative offset...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5376
  
If we decide that we want to support negative offsets, this looks like a 
good implementation.

@aljoscha what is your take, should we support negative offsets?


---


[jira] [Commented] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392961#comment-16392961
 ] 

ASF GitHub Bot commented on FLINK-8364:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5356
  
I am wondering whether this discussion is a bit confused.

All state facing the user in the APIs already has the behavior that there 
is no `null`, but only empty iterators. That's because all state is wrapped 
into a `UserFacingListState` in the `DefaultKeyedStateStore`.

So, is this a non-issue, actually? Something that may only affect test 
implementations of `ListState`



> Add iterator() to ListState which returns empty iterator when it has no value
> -
>
> Key: FLINK-8364
> URL: https://issues.apache.org/jira/browse/FLINK-8364
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Add iterator() to ListState which returns empty iterator when it has no value



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392957#comment-16392957
 ] 

ASF GitHub Bot commented on FLINK-8297:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5185
  
Is the general thought to always have list state as a map, or only fallback 
to that way if the lists cross a certain threshold?

Since list state backs many common operations (non aggregating windows) we 
have to be super careful with the performance implications of this.


> RocksDBListState stores whole list in single byte[]
> ---
>
> Key: FLINK-8297
> URL: https://issues.apache.org/jira/browse/FLINK-8297
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Jan Lukavský
>Priority: Major
>
> RocksDBListState currently keeps whole list of data in single RocksDB 
> key-value pair, which implies that the list actually must fit into memory. 
> Larger lists are not supported and end up with OOME or other error. The 
> RocksDBListState could be modified so that individual items in list are 
> stored in separate keys in RocksDB and can then be iterated over. A simple 
> implementation could reuse existing RocksDBMapState, with key as index to the 
> list and a single RocksDBValueState keeping track of how many items has 
> already been added to the list. Because this implementation might be less 
> efficient in come cases, it would be good to make it opt-in by a construct 
> like
> {{new RocksDBStateBackend().enableLargeListsPerKey()}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5356: [FLINK-8364][state backend] Add iterator() to ListState w...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5356
  
I am wondering whether this discussion is a bit confused.

All state facing the user in the APIs already has the behavior that there 
is no `null`, but only empty iterators. That's because all state is wrapped 
into a `UserFacingListState` in the `DefaultKeyedStateStore`.

So, is this a non-issue, actually? Something that may only affect test 
implementations of `ListState`



---


[GitHub] flink issue #5185: [FLINK-8297] [flink-rocksdb] Optionally store elements of...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5185
  
Is the general thought to always have list state as a map, or only fallback 
to that way if the lists cross a certain threshold?

Since list state backs many common operations (non aggregating windows) we 
have to be super careful with the performance implications of this.


---


[jira] [Commented] (FLINK-8822) RotateLogFile may not work well when sed version is below 4.2

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392953#comment-16392953
 ] 

ASF GitHub Bot commented on FLINK-8822:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5609
  
ehh...probably. The PR has a point as `sed --help` (v4.2.2) does not list 
the `-E` option.

If we want to be super safe we could check that  `sed -r` is supported and 
use `-E` as a backup.


> RotateLogFile may not work well when sed version is below 4.2
> -
>
> Key: FLINK-8822
> URL: https://issues.apache.org/jira/browse/FLINK-8822
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Xin Liu
>Priority: Major
> Fix For: 1.5.0
>
>
> In bin/config.sh rotateLogFilesWithPrefix(),it use extended regular to 
> process filename with "sed -E",but when sed version is below 4.2,it turns out 
> "sed: invalid option -- 'E'"
> and RotateLogFile won't work well : There will be only one logfile no matter 
> what is $MAX_LOG_FILE_NUMBER.
> so use sed -r may be more suitable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5609: [FLINK-8822] RotateLogFile may not work well when sed ver...

2018-03-09 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5609
  
ehh...probably. The PR has a point as `sed --help` (v4.2.2) does not list 
the `-E` option.

If we want to be super safe we could check that  `sed -r` is supported and 
use `-E` as a backup.


---


[jira] [Updated] (FLINK-8897) Rowtime materialization causes "mismatched type" AssertionError

2018-03-09 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8897:

Fix Version/s: 1.5.0

> Rowtime materialization causes "mismatched type" AssertionError
> ---
>
> Key: FLINK-8897
> URL: https://issues.apache.org/jira/browse/FLINK-8897
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Priority: Blocker
> Fix For: 1.5.0
>
>
> As raised in [this 
> thread|https://lists.apache.org/thread.html/e2ea38aa7ae224d7481145334955d84243690e9aad10d58310bdb8e7@%3Cuser.flink.apache.org%3E],
>  the query created by the following code will throw a calcite "mismatch type" 
> ({{Timestamp(3)}} and {{TimeIndicator}}) exception.
> {code:java}
> String sql1 = "select id, eventTs as t1, count(*) over (partition by id order 
> by eventTs rows between 100 preceding and current row) as cnt1 from myTable1";
> String sql2 = "select distinct id as r_id, eventTs as t2, count(*) over 
> (partition by id order by eventTs rows between 50 preceding and current row) 
> as cnt2 from myTable2";
> Table left = tableEnv.sqlQuery(sql1);
> Table right = tableEnv.sqlQuery(sql2);
> left.join(right).where("id === r_id && t1 === t2").select("id, 
> t1").writeToSink(...)
> {code}
> The logical plan is as follows.
> {code}
> LogicalProject(id=[$0], t1=[$1])
>   LogicalFilter(condition=[AND(=($0, $3), =($1, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalAggregate(group=[{0, 1, 2}])
> LogicalWindow(window#0=[window(partition {0} order by [1] rows 
> between $2 PRECEDING and CURRENT ROW aggs [COUNT()])])
>   LogicalProject(id=[$0], eventTs=[$3])
> LogicalTableScan(table=[[_DataStreamTable_0]])
>   LogicalAggregate(group=[{0, 1, 2}])
> LogicalWindow(window#0=[window(partition {0} order by [1] rows 
> between $2 PRECEDING and CURRENT ROW aggs [COUNT()])])
>   LogicalProject(id=[$0], eventTs=[$3])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> {code}
> That is because the the rowtime field after an aggregation will be 
> materialized while the {{RexInputRef}} type for the filter's operands ({{t1 
> === t2}}) is still {{TimeIndicator}}. We should make them unified.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8897) Rowtime materialization causes "mismatched type" AssertionError

2018-03-09 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8897:

Priority: Blocker  (was: Major)

> Rowtime materialization causes "mismatched type" AssertionError
> ---
>
> Key: FLINK-8897
> URL: https://issues.apache.org/jira/browse/FLINK-8897
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Priority: Blocker
> Fix For: 1.5.0
>
>
> As raised in [this 
> thread|https://lists.apache.org/thread.html/e2ea38aa7ae224d7481145334955d84243690e9aad10d58310bdb8e7@%3Cuser.flink.apache.org%3E],
>  the query created by the following code will throw a calcite "mismatch type" 
> ({{Timestamp(3)}} and {{TimeIndicator}}) exception.
> {code:java}
> String sql1 = "select id, eventTs as t1, count(*) over (partition by id order 
> by eventTs rows between 100 preceding and current row) as cnt1 from myTable1";
> String sql2 = "select distinct id as r_id, eventTs as t2, count(*) over 
> (partition by id order by eventTs rows between 50 preceding and current row) 
> as cnt2 from myTable2";
> Table left = tableEnv.sqlQuery(sql1);
> Table right = tableEnv.sqlQuery(sql2);
> left.join(right).where("id === r_id && t1 === t2").select("id, 
> t1").writeToSink(...)
> {code}
> The logical plan is as follows.
> {code}
> LogicalProject(id=[$0], t1=[$1])
>   LogicalFilter(condition=[AND(=($0, $3), =($1, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalAggregate(group=[{0, 1, 2}])
> LogicalWindow(window#0=[window(partition {0} order by [1] rows 
> between $2 PRECEDING and CURRENT ROW aggs [COUNT()])])
>   LogicalProject(id=[$0], eventTs=[$3])
> LogicalTableScan(table=[[_DataStreamTable_0]])
>   LogicalAggregate(group=[{0, 1, 2}])
> LogicalWindow(window#0=[window(partition {0} order by [1] rows 
> between $2 PRECEDING and CURRENT ROW aggs [COUNT()])])
>   LogicalProject(id=[$0], eventTs=[$3])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> {code}
> That is because the the rowtime field after an aggregation will be 
> materialized while the {{RexInputRef}} type for the filter's operands ({{t1 
> === t2}}) is still {{TimeIndicator}}. We should make them unified.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392937#comment-16392937
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5662
  
Thanks for the comments @xccui. It's never to late for feedback. Sorry, 
maybe I merged this too quickly. We still need to call 
`builder.forJsonSchema()` if the schema contains a `proctime` attribute. The 
most common use case will be to extend the format by time attributes. With your 
approach the format would contain an additional timestamp that is definitely 
not part of the format schema.


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5662: [FLINK-8854] [table] Fix schema mapping with time attribu...

2018-03-09 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5662
  
Thanks for the comments @xccui. It's never to late for feedback. Sorry, 
maybe I merged this too quickly. We still need to call 
`builder.forJsonSchema()` if the schema contains a `proctime` attribute. The 
most common use case will be to extend the format by time attributes. With your 
approach the format would contain an additional timestamp that is definitely 
not part of the format schema.


---


[jira] [Updated] (FLINK-8912) Web UI does not render error messages correctly in FLIP-6 mode

2018-03-09 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8912:

Labels: flip6  (was: )

> Web UI does not render error messages correctly in FLIP-6 mode
> --
>
> Key: FLINK-8912
> URL: https://issues.apache.org/jira/browse/FLINK-8912
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.5.0, 1.6.0
> Environment: commit: c531486288caf5241cdf7f0f00f087f3ce82239f
>Reporter: Gary Yao
>Priority: Blocker
>  Labels: flip6
> Fix For: 1.5.0
>
>
> *Description*
> The Web UI renders error messages returned by the REST API incorrectly, e.g., 
> on the job submission page. The JSON returned by the REST API is rendered as 
> a whole. However, the UI should only render the contents of the {{errors}} 
> field.
> *Steps to reproduce*
> Submit {{examples/streaming/SocketWindowWordCount.jar}} without specifying 
> program arguments. Error message will be rendered as follows:
> {noformat}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> program plan could not be fetched - the program aborted 
> pre-maturely.\n\nSystem.err: (none)\n\nSystem.out: No port specified. Please 
> run 'SocketWindowWordCount --hostname  --port ', where 
> hostname (localhost by default) and port is the address of the text 
> server\nTo start a simple text server, run 'netcat -l ' and type the 
> input text into the command line\n"]}
> {noformat}
> Note that flip6 mode must be enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8909) pyflink.sh not working with yarn

2018-03-09 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8909:

Affects Version/s: 1.4.2

> pyflink.sh not working with yarn
> 
>
> Key: FLINK-8909
> URL: https://issues.apache.org/jira/browse/FLINK-8909
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.2
>Reporter: Hitesh Tiwari
>Priority: Blocker
>
> Hi,
> i want to run the python application from pyflink.sh  with yarn-cluster mode. 
> Added  "-m yarn-cluster -yn 1 " in pyflink.sh. so my updated  pyflink.sh is 
> executing below coomand:
> "$FLINK_BIN_DIR"/flink run -m yarn-cluster -yn 1  -v 
> "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@"
>  Running pyflink.sh:
> ./bin/pyflink.sh /opt/pnda/hitesh/flink-1.4.2/examples/python/WordCount.py
> While running  getting below Error:
> java.lang.Exception: The user defined 'open()' method caused an exception: An 
> error occurred while copying the file.
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: An error occurred while copying the 
> file.
>  at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100)
>  at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>  ... 3 more
>  Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist 
> or the user running Flink ('yarn') has insufficient permissions to access it.
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
>  at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  ... 1 more
> 03/09/2018 11:20:23 Job execution switched to status FAILING.
>  java.lang.Exception: The user defined 'open()' method caused an exception: 
> An error occurred while copying the file.
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: An error occurred while copying the 
> file.
>  at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100)
>  at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>  ... 3 more
>  Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist 
> or the user running Flink ('yarn') has insufficient permissions to access it.
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
>  at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at 

[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392933#comment-16392933
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173465435
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -198,14 +205,20 @@ object SchemaValidator {
   val isProctime = properties
 .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
 .orElse(false)
-  val isRowtime = properties
-.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+  val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
+  val isRowtime = properties.containsKey(tsType)
   if (!isProctime && !isRowtime) {
 // check for a aliasing
 val fieldName = 
properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
   .orElse(n)
 builder.field(fieldName, t)
   }
+  // only use the rowtime attribute if it references a field
+  else if (isRowtime &&
+  properties.getString(tsType) == 
ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) {
--- End diff --

You are right, we should declare `ExistingField` `final`. In the custom 
extractor case, a user has to supply the format manually. Maybe we will need an 
explanation logic in the future such that a user can see how the derived format 
looks like and if it makes sense to declare it explicitly.


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

2018-03-09 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173465435
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -198,14 +205,20 @@ object SchemaValidator {
   val isProctime = properties
 .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
 .orElse(false)
-  val isRowtime = properties
-.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+  val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
+  val isRowtime = properties.containsKey(tsType)
   if (!isProctime && !isRowtime) {
 // check for a aliasing
 val fieldName = 
properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
   .orElse(n)
 builder.field(fieldName, t)
   }
+  // only use the rowtime attribute if it references a field
+  else if (isRowtime &&
+  properties.getString(tsType) == 
ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) {
--- End diff --

You are right, we should declare `ExistingField` `final`. In the custom 
extractor case, a user has to supply the format manually. Maybe we will need an 
explanation logic in the future such that a user can see how the derived format 
looks like and if it makes sense to declare it explicitly.


---


[jira] [Created] (FLINK-8912) Web UI does not render error messages correctly in FLIP-6 mode

2018-03-09 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8912:
---

 Summary: Web UI does not render error messages correctly in FLIP-6 
mode
 Key: FLINK-8912
 URL: https://issues.apache.org/jira/browse/FLINK-8912
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.5.0, 1.6.0
 Environment: commit: c531486288caf5241cdf7f0f00f087f3ce82239f
Reporter: Gary Yao
 Fix For: 1.5.0


*Description*

The Web UI renders error messages returned by the REST API incorrectly, e.g., 
on the job submission page. The JSON returned by the REST API is rendered as a 
whole. However, the UI should only render the contents of the {{errors}} field.

*Steps to reproduce*

Submit {{examples/streaming/SocketWindowWordCount.jar}} without specifying 
program arguments. Error message will be rendered as follows:
{noformat}
{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
program plan could not be fetched - the program aborted 
pre-maturely.\n\nSystem.err: (none)\n\nSystem.out: No port specified. Please 
run 'SocketWindowWordCount --hostname  --port ', where hostname 
(localhost by default) and port is the address of the text server\nTo start a 
simple text server, run 'netcat -l ' and type the input text into the 
command line\n"]}
{noformat}
Note that flip6 mode must be enabled.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8822) RotateLogFile may not work well when sed version is below 4.2

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392919#comment-16392919
 ] 

ASF GitHub Bot commented on FLINK-8822:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5609
  
@zentol What do you think? Is this safe to merge?


> RotateLogFile may not work well when sed version is below 4.2
> -
>
> Key: FLINK-8822
> URL: https://issues.apache.org/jira/browse/FLINK-8822
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Xin Liu
>Priority: Major
> Fix For: 1.5.0
>
>
> In bin/config.sh rotateLogFilesWithPrefix(),it use extended regular to 
> process filename with "sed -E",but when sed version is below 4.2,it turns out 
> "sed: invalid option -- 'E'"
> and RotateLogFile won't work well : There will be only one logfile no matter 
> what is $MAX_LOG_FILE_NUMBER.
> so use sed -r may be more suitable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5609: [FLINK-8822] RotateLogFile may not work well when sed ver...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5609
  
@zentol What do you think? Is this safe to merge?


---


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392915#comment-16392915
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5538
  
I see the need for the feature, but I am skeptical concerning the 
implementation.

This PR reflectively modifies the contents of a String. This is prone to 
cause problems, for multiple reasons. Strings cache hash codes, and strings 
themselves are interned and shared.

Is there no other way to pass a default namespace to the mapping?


> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5538: [FLINK-8655] [DataSink] Added default keyspace to Cassand...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5538
  
I see the need for the feature, but I am skeptical concerning the 
implementation.

This PR reflectively modifies the contents of a String. This is prone to 
cause problems, for multiple reasons. Strings cache hash codes, and strings 
themselves are interned and shared.

Is there no other way to pass a default namespace to the mapping?


---


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392913#comment-16392913
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173455747
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -198,14 +205,20 @@ object SchemaValidator {
   val isProctime = properties
 .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
 .orElse(false)
-  val isRowtime = properties
-.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+  val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
+  val isRowtime = properties.containsKey(tsType)
   if (!isProctime && !isRowtime) {
 // check for a aliasing
 val fieldName = 
properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
   .orElse(n)
 builder.field(fieldName, t)
   }
+  // only use the rowtime attribute if it references a field
+  else if (isRowtime &&
+  properties.getString(tsType) == 
ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) {
--- End diff --

What if the user uses the custom extractor to define his/her own 
`ExistingField` extractor that references a field?


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8714) Suggest new users to use env.readTextFile method with 2 arguments (using the charset), not to rely on system charset (which varies across environments)

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392912#comment-16392912
 ] 

ASF GitHub Bot commented on FLINK-8714:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5536
  
Sorry, I think the JavaDoc comment that triggered this change was actually 
incorrect in the first place.

By default, the read methods always use "UTF-8" rather than the system 
default charset, so it is actually not non-deterministic.

I would personally vote fix the javadoc and other docs that incorrectly 
claim this is using the system-dependent charset, and leave the other docs as 
they are (not explicitly pass the same charset name that is anyways passed, 
makes it simpler).


> Suggest new users to use env.readTextFile method with 2 arguments (using the 
> charset), not to rely on system charset (which varies across environments)
> ---
>
> Key: FLINK-8714
> URL: https://issues.apache.org/jira/browse/FLINK-8714
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Michal Klempa
>Priority: Trivial
>  Labels: easyfix, newbie, patch-available
>
> When a newcomer (like me), goes through the docs, there are several places 
> where examples encourage to read the input data using the 
> {{env.readTextFile()}} method.
>  
> This method variant does not take a second argument - character set (see 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-).]
>  This versoin relies (according to Javadoc) on " The file will be read with 
> the system's default character set. "
>  
> This behavior is also default in Java, like in the 
> {{java.util.String.getBytes()}} method, where not supplying the charset mean 
> - use the system locale or the one which JVM was started with (see 
> [https://stackoverflow.com/questions/64038/setting-java-locale-settings).] 
> There are two ways to set locale prior to JVM start (-D arguments or set 
> LC_ALL variable).
>  
> Given this is something a new Flink user may not know about, nor he wants to 
> spend hours trying to find the environment-related bug (it works on 
> localhost, but in production the locale is different), I would kindly suggest 
> a change in documentation: lets migrate examples to use the two-argument 
> version of {{readTextFile(filePath, charsetName)}}.
>  
> I am open to criticism and suggestions. The listing of {{readTextFile}} I was 
> able to grep in docs is:
> {code:java}
> ./dev/datastream_api.md:- `readTextFile(path)` - Reads text files, i.e. files 
> that respect the `TextInputFormat` specification, line-by-line and returns 
> them as Strings.
> ./dev/datastream_api.md:- `readTextFile(path)` - Reads text files, i.e. files 
> that respect the `TextInputFormat` specification, line-by-line and returns 
> them as Strings.
> ./dev/libs/storm_compatibility.md:DataStream text = 
> env.readTextFile(localFilePath);
> ./dev/cluster_execution.md:    DataSet data = 
> env.readTextFile("hdfs://path/to/file");
> ./dev/batch/index.md:- `readTextFile(path)` / `TextInputFormat` - Reads files 
> line wise and returns them as Strings.
> ./dev/batch/index.md:- `readTextFileWithValue(path)` / `TextValueInputFormat` 
> - Reads files line wise and returns them as
> ./dev/batch/index.md:DataSet localLines = 
> env.readTextFile("file:///path/to/my/textfile");
> ./dev/batch/index.md:DataSet hdfsLines = 
> env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
> ./dev/batch/index.md:DataSet logs = 
> env.readTextFile("file:///path/with.nested/files")
> ./dev/batch/index.md:- `readTextFile(path)` / `TextInputFormat` - Reads files 
> line wise and returns them as Strings.
> ./dev/batch/index.md:- `readTextFileWithValue(path)` / `TextValueInputFormat` 
> - Reads files line wise and returns them as
> ./dev/batch/index.md:val localLines = 
> env.readTextFile("file:///path/to/my/textfile")
> ./dev/batch/index.md:val hdfsLines = 
> env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
> ./dev/batch/index.md:env.readTextFile("file:///path/with.nested/files").withParameters(parameters)
> ./dev/batch/index.md:DataSet lines = env.readTextFile(pathToTextFile);
> ./dev/batch/index.md:val lines = env.readTextFile(pathToTextFile)
> ./dev/batch/examples.md:DataSet text = 
> env.readTextFile("/path/to/file");
> ./dev/batch/examples.md:val text = env.readTextFile("/path/to/file")
> ./dev/api_concepts.md:DataStream text = 
> env.readTextFile("file:///path/to/file");
> ./dev/api_concepts.md:val text: 

[GitHub] flink issue #5536: [FLINK-8714][Documentation] Added either charsetName) or ...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5536
  
Sorry, I think the JavaDoc comment that triggered this change was actually 
incorrect in the first place.

By default, the read methods always use "UTF-8" rather than the system 
default charset, so it is actually not non-deterministic.

I would personally vote fix the javadoc and other docs that incorrectly 
claim this is using the system-dependent charset, and leave the other docs as 
they are (not explicitly pass the same charset name that is anyways passed, 
makes it simpler).


---


  1   2   3   >