[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521828#comment-16521828
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user xueyumusic commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197674277
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -349,6 +349,17 @@ case class TimestampAdd(
 if (!TypeCheckUtils.isString(unit.resultType)) {
   return ValidationFailure(s"TimestampAdd operator requires unit to be 
of type " +
 s"String Literal, but get ${unit.resultType}.")
+} else {
+  val unitStr = unit.toString()
+  if (!sqlTsiArray.contains(unitStr) &&
+!sqlTsiArray.map(item => item.split("_").last).contains(unitStr)) {
+  return ValidationFailure(s"TimestampAdd operator requires unit 
to be one of (YEAR, " +
--- End diff --

fix this, add ")", thank you!


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-24 Thread xueyumusic
Github user xueyumusic commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197674277
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -349,6 +349,17 @@ case class TimestampAdd(
 if (!TypeCheckUtils.isString(unit.resultType)) {
   return ValidationFailure(s"TimestampAdd operator requires unit to be 
of type " +
 s"String Literal, but get ${unit.resultType}.")
+} else {
+  val unitStr = unit.toString()
+  if (!sqlTsiArray.contains(unitStr) &&
+!sqlTsiArray.map(item => item.split("_").last).contains(unitStr)) {
+  return ValidationFailure(s"TimestampAdd operator requires unit 
to be one of (YEAR, " +
--- End diff --

fix this, add ")", thank you!


---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521801#comment-16521801
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
Maybe let me elaborate the TTL checking condition in detail, overall the 
checking condition contains two parts and looks like `(current_ts - update_ts) 
- time_shift_offset >= TTL`.

The `time_shift_offset` is the shift offset that we should applied when 
checking the TTL.

- For the records that the `update_ts` > `checkpoint_ts`, we could know 
they were created(or updated) after the last restoring so we don't need to 
apply any shift to it. So that shift offset is `0`.

- For the records that the `update_ts` <= `checkpoint_ts`, we could know 
they were created(or updated) before the last restoring so we need to apply the 
shift to it, the shift offset is `recovery_ts - checkpoint_ts`.

In our current code, we didn't do the time-align works, it equals to a 
special case of the above condition where the `time_shift_offset` is always `0`.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

2018-06-24 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
Maybe let me elaborate the TTL checking condition in detail, overall the 
checking condition contains two parts and looks like `(current_ts - update_ts) 
- time_shift_offset >= TTL`.

The `time_shift_offset` is the shift offset that we should applied when 
checking the TTL.

- For the records that the `update_ts` > `checkpoint_ts`, we could know 
they were created(or updated) after the last restoring so we don't need to 
apply any shift to it. So that shift offset is `0`.

- For the records that the `update_ts` <= `checkpoint_ts`, we could know 
they were created(or updated) before the last restoring so we need to apply the 
shift to it, the shift offset is `recovery_ts - checkpoint_ts`.

In our current code, we didn't do the time-align works, it equals to a 
special case of the above condition where the `time_shift_offset` is always `0`.


---


[jira] [Assigned] (FLINK-9653) Add operator name to latency metrics

2018-06-24 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9653:
---

Assignee: vinoyang

> Add operator name to latency metrics
> 
>
> Key: FLINK-9653
> URL: https://issues.apache.org/jira/browse/FLINK-9653
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
> Environment: All
>Reporter: Julian Stephen
>Assignee: vinoyang
>Priority: Minor
>
> Currently the latency metrics reports latency between subtasks using this 
> format: 
> flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency{
>  host="",instance="",job="", 
> job_id="",job_name="",operator_id="",operator_subtask_index="0", 
> quantile="0.99",source_id="",source_subtask_index="0",tm_id=""
> The request is to add {{operator_name}} along with {{operator_id}} to the 
> metric labels.
> For a simple job, (e.g., {{source->map->sink)  }} you can see two sets of 
> latency metrics. Each set shows all quantiles like (.5, .95..). Only thing 
> different between the two sets is the {{operator_id}}. This makes sense 
> assuming one {{operator_id}} belongs to the {{map}} operator and the other 
> belongs to the {{sink}}.
>  Now the problem is that is no intuitive way to distinguish between the two 
> (find out which operator_id is the {{map vs sink}}), just by looking at the 
> metrics. 
> Assigning names to {{map}} and {{sink}} operator does not help. Even though 
> these names show up in other metrics like {{numRecordsIn}}, the names does 
> not show up in the latency metric.
> The feature request is to add {{operator_name}} along with {{operator_id}} to 
> the metric label so that it can be easily used in dashboards and tracking.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521762#comment-16521762
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197662959
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -349,6 +349,17 @@ case class TimestampAdd(
 if (!TypeCheckUtils.isString(unit.resultType)) {
   return ValidationFailure(s"TimestampAdd operator requires unit to be 
of type " +
 s"String Literal, but get ${unit.resultType}.")
+} else {
+  val unitStr = unit.toString()
+  if (!sqlTsiArray.contains(unitStr) &&
+!sqlTsiArray.map(item => item.split("_").last).contains(unitStr)) {
+  return ValidationFailure(s"TimestampAdd operator requires unit 
to be one of (YEAR, " +
--- End diff --

Remove the "("  from "(YEAR", or add a ")" behind 


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-24 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197662959
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -349,6 +349,17 @@ case class TimestampAdd(
 if (!TypeCheckUtils.isString(unit.resultType)) {
   return ValidationFailure(s"TimestampAdd operator requires unit to be 
of type " +
 s"String Literal, but get ${unit.resultType}.")
+} else {
+  val unitStr = unit.toString()
+  if (!sqlTsiArray.contains(unitStr) &&
+!sqlTsiArray.map(item => item.split("_").last).contains(unitStr)) {
+  return ValidationFailure(s"TimestampAdd operator requires unit 
to be one of (YEAR, " +
--- End diff --

Remove the "("  from "(YEAR", or add a ")" behind 


---


[jira] [Updated] (FLINK-6557) RocksDBKeyedStateBackend broken on Windows

2018-06-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-6557:
--
Labels: pull-request-available  (was: )

> RocksDBKeyedStateBackend broken on Windows
> --
>
> Key: FLINK-6557
> URL: https://issues.apache.org/jira/browse/FLINK-6557
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.3.0, 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.0
>
>
> The {{RocksDBKeyedStateBackend}} cannot be used on Windows. We pass the 
> result of {{org.apache.flink.core.fs.Path#getPath()}} to RocksDB which will 
> fail when trying to create the checkpoint file as the path begins with a 
> slash which isn't a valid Windows path:
> {code}
> /C:/Users/Zento/AppData/Local/Temp/junit572330160893758355/junit5754599533651878867/job-ecbdb9df76fd3a39108dac7c515e3214_op-Test_uuid-6a43f1f6-1f35-443e-945c-aab3643e62fc/chk-0.tmp
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6557) RocksDBKeyedStateBackend broken on Windows

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521748#comment-16521748
 ] 

ASF GitHub Bot commented on FLINK-6557:
---

Github user vschafer closed the pull request at:

https://github.com/apache/flink/pull/4269


> RocksDBKeyedStateBackend broken on Windows
> --
>
> Key: FLINK-6557
> URL: https://issues.apache.org/jira/browse/FLINK-6557
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.3.0, 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.0
>
>
> The {{RocksDBKeyedStateBackend}} cannot be used on Windows. We pass the 
> result of {{org.apache.flink.core.fs.Path#getPath()}} to RocksDB which will 
> fail when trying to create the checkpoint file as the path begins with a 
> slash which isn't a valid Windows path:
> {code}
> /C:/Users/Zento/AppData/Local/Temp/junit572330160893758355/junit5754599533651878867/job-ecbdb9df76fd3a39108dac7c515e3214_op-Test_uuid-6a43f1f6-1f35-443e-945c-aab3643e62fc/chk-0.tmp
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4269: FLINK-6557 Fixing tests for RocksDB state backend ...

2018-06-24 Thread vschafer
Github user vschafer closed the pull request at:

https://github.com/apache/flink/pull/4269


---


[jira] [Created] (FLINK-9653) Add operator name to latency metrics

2018-06-24 Thread Julian Stephen (JIRA)
Julian Stephen created FLINK-9653:
-

 Summary: Add operator name to latency metrics
 Key: FLINK-9653
 URL: https://issues.apache.org/jira/browse/FLINK-9653
 Project: Flink
  Issue Type: New Feature
  Components: Metrics
 Environment: All
Reporter: Julian Stephen


Currently the latency metrics reports latency between subtasks using this 
format: 
flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency{
 host="",instance="",job="", 
job_id="",job_name="",operator_id="",operator_subtask_index="0", 
quantile="0.99",source_id="",source_subtask_index="0",tm_id=""

The request is to add {{operator_name}} along with {{operator_id}} to the 
metric labels.

For a simple job, (e.g., {{source->map->sink)  }} you can see two sets of 
latency metrics. Each set shows all quantiles like (.5, .95..). Only thing 
different between the two sets is the {{operator_id}}. This makes sense 
assuming one {{operator_id}} belongs to the {{map}} operator and the other 
belongs to the {{sink}}.

 Now the problem is that is no intuitive way to distinguish between the two 
(find out which operator_id is the {{map vs sink}}), just by looking at the 
metrics. 

Assigning names to {{map}} and {{sink}} operator does not help. Even though 
these names show up in other metrics like {{numRecordsIn}}, the names does not 
show up in the latency metric.

The feature request is to add {{operator_name}} along with {{operator_id}} to 
the metric label so that it can be easily used in dashboards and tracking.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9380) Failing end-to-end tests should not clean up logs

2018-06-24 Thread Deepak Sharma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521570#comment-16521570
 ] 

Deepak Sharma commented on FLINK-9380:
--

Hi [~till.rohrmann], would you still like me to work on this? If so, please let 
me know what you think about the question above.

> Failing end-to-end tests should not clean up logs
> -
>
> Key: FLINK-9380
> URL: https://issues.apache.org/jira/browse/FLINK-9380
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Deepak Sharma
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.6.0, 1.5.1
>
>
> Some of the end-to-end tests clean up their logs also in the failure case. 
> This makes debugging and understanding the problem extremely difficult. 
> Ideally, the scripts says where it stored the respective logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9563) Migrate integration tests for CEP

2018-06-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9563:
--
Labels: pull-request-available  (was: )

> Migrate integration tests for CEP
> -
>
> Key: FLINK-9563
> URL: https://issues.apache.org/jira/browse/FLINK-9563
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Deepak Sharma
>Assignee: Deepak Sharma
>Priority: Minor
>  Labels: pull-request-available
>
> Covers all integration tests under
> apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521568#comment-16521568
 ] 

ASF GitHub Bot commented on FLINK-9563:
---

Github user deepaks4077 commented on the issue:

https://github.com/apache/flink/pull/6170
  
@zentol, do you have some time this week to review this? I can proceed with 
the other tests once I know that this is the right way to handle it :)


> Migrate integration tests for CEP
> -
>
> Key: FLINK-9563
> URL: https://issues.apache.org/jira/browse/FLINK-9563
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Deepak Sharma
>Assignee: Deepak Sharma
>Priority: Minor
>  Labels: pull-request-available
>
> Covers all integration tests under
> apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

2018-06-24 Thread deepaks4077
Github user deepaks4077 commented on the issue:

https://github.com/apache/flink/pull/6170
  
@zentol, do you have some time this week to review this? I can proceed with 
the other tests once I know that this is the right way to handle it :)


---


[jira] [Closed] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final

2018-06-24 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-9585.
-
Resolution: Fixed

Merged in:
master: 5fa61d8ceac8f865002dc0ef84dc1a3c65753d0b

> Logger in ZooKeeperStateHandleStore is public and non-final
> ---
>
> Key: FLINK-9585
> URL: https://issues.apache.org/jira/browse/FLINK-9585
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Trivial
>  Labels: pull-request-available
>
> The logger in {{ZooKeeperStateHandleStore}} should be private and final.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521532#comment-16521532
 ] 

ASF GitHub Bot commented on FLINK-9585:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6175


> Logger in ZooKeeperStateHandleStore is public and non-final
> ---
>
> Key: FLINK-9585
> URL: https://issues.apache.org/jira/browse/FLINK-9585
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Trivial
>  Labels: pull-request-available
>
> The logger in {{ZooKeeperStateHandleStore}} should be private and final.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6175: [FLINK-9585] Logger in ZooKeeperStateHandleStore i...

2018-06-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6175


---


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521509#comment-16521509
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user xueyumusic commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197642129
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,71 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", 
"SQL_TSI_QUARTER", "SQL_TSI_MONTH",
+"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+
+  override private[flink] def children: Seq[Expression] = unit :: count :: 
timestamp :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isString(unit.resultType)) {
--- End diff --

valid input more strict, and add validation test, thank you


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-24 Thread xueyumusic
Github user xueyumusic commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197642129
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,71 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", 
"SQL_TSI_QUARTER", "SQL_TSI_MONTH",
+"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+
+  override private[flink] def children: Seq[Expression] = unit :: count :: 
timestamp :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isString(unit.resultType)) {
--- End diff --

valid input more strict, and add validation test, thank you


---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521498#comment-16521498
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
I'm still a bit worried about the time-align problem on recovery(because 
I've met serval case that would become disaster on production if we don't do 
the time-align on recovery. (One instance: we used the DFS to store the 
checkpoint data, and the DFS went into safe-mode because of some problems, we 
took several hours to notice that and also took some times to address the 
issue. After addressing DFS's issue, user's jobs were resumed and begin to run 
correctly. In this case, if we don't do the time-align on recovery, then user's 
state maybe already totally expired(when TTL <= the `system down time`)).

I had a second thought on this problem, and I think maybe we could do that 
without a full scanning of the records, the approach is outlined below.

- 1. We need to remember the timestamp when performing the checkpoint, 
let's say it `checkpoint_ts`.
- 2. We also need to remember the timestamp when recovering from the 
checkpoint, let's say it `recovery_ts`.
- 3. For each record, we remember it's last update timestamp, let's say it 
`update_ts`.
- 5. And the current time stamp is `current_ts`.
- 4. Then we could use the follow condition `checkpoint_ts - update_ts + 
current_s - recovery_ts >= TTL` to check whether the record is expired. If it's 
true then record is expired, otherwise the record is still alive.

What do you think? @azagrebin ,  and @StefanRRichter would be really nice 
to learn your opinion about this problem.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

2018-06-24 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
I'm still a bit worried about the time-align problem on recovery(because 
I've met serval case that would become disaster on production if we don't do 
the time-align on recovery. (One instance: we used the DFS to store the 
checkpoint data, and the DFS went into safe-mode because of some problems, we 
took several hours to notice that and also took some times to address the 
issue. After addressing DFS's issue, user's jobs were resumed and begin to run 
correctly. In this case, if we don't do the time-align on recovery, then user's 
state maybe already totally expired(when TTL <= the `system down time`)).

I had a second thought on this problem, and I think maybe we could do that 
without a full scanning of the records, the approach is outlined below.

- 1. We need to remember the timestamp when performing the checkpoint, 
let's say it `checkpoint_ts`.
- 2. We also need to remember the timestamp when recovering from the 
checkpoint, let's say it `recovery_ts`.
- 3. For each record, we remember it's last update timestamp, let's say it 
`update_ts`.
- 5. And the current time stamp is `current_ts`.
- 4. Then we could use the follow condition `checkpoint_ts - update_ts + 
current_s - recovery_ts >= TTL` to check whether the record is expired. If it's 
true then record is expired, otherwise the record is still alive.

What do you think? @azagrebin ,  and @StefanRRichter would be really nice 
to learn your opinion about this problem.


---


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521496#comment-16521496
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197640444
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,71 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", 
"SQL_TSI_QUARTER", "SQL_TSI_MONTH",
+"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+
+  override private[flink] def children: Seq[Expression] = unit :: count :: 
timestamp :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isString(unit.resultType)) {
+  return ValidationFailure(s"TimestampAdd operator requires unit to be 
of type " +
+s"String Literal, but get ${unit.resultType}.")
+}
+if (!TypeCheckUtils.isTimePoint(timestamp.resultType)) {
+  return ValidationFailure(s"TimestampAdd operator requires timestamp 
to be of type " +
+s"SqlTimeTypeInfo, but get ${timestamp.resultType}.")
+}
+ValidationSuccess
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val timeUnit = unit match {
+  case Literal(value: String, STRING_TYPE_INFO) =>
+if (sqlTsiArray.contains(value)) {
+  Some(TimeUnit.valueOf(value.split("_").last))
+} else {
+  Some(TimeUnit.valueOf(value))
+}
+  case _ => None
+}
+
+val interval = count match {
+  case Literal(value: Int, INT_TYPE_INFO) =>
+makeInterval(value.toLong, timeUnit)
+  case Literal(value: Long, LONG_TYPE_INFO) =>
+makeInterval(value, timeUnit)
+  case _ =>
+relBuilder.call(SqlStdOperatorTable.MULTIPLY,
+  
relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier,
+new SqlIntervalQualifier(timeUnit.get, null, 
SqlParserPos.ZERO)),
+  count.toRexNode)
+}
+
+relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, 
timestamp.toRexNode, interval)
+  }
+
+  override def toString: String = s"timestampAdd(${children.mkString(", 
")})"
+
+  override private[flink] def resultType: TypeInformation[_] = 
STRING_TYPE_INFO
+
+  private[flink] def makeInterval(value: Long, timeUnit: Option[TimeUnit])
+(implicit relBuilder: RelBuilder) = {
+val countWithUnit = 
timeUnit.get.multiplier.multiply(java.math.BigDecimal.valueOf(value))
+  relBuilder.getRexBuilder.makeIntervalLiteral(countWithUnit,
--- End diff --

indent with two spaces


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521495#comment-16521495
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197640442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,71 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", 
"SQL_TSI_QUARTER", "SQL_TSI_MONTH",
+"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+
+  override private[flink] def children: Seq[Expression] = unit :: count :: 
timestamp :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isString(unit.resultType)) {
--- End diff --

1. Not only a string, but also should be a valid unit, i.e., must be YEAR 
MONTH...
2. Check count is a type of Integer or Long
3. Add ValidationTest, you can refer to the tests in 
`ScalarFunctionsValidationTest`


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-24 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197640442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,71 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", 
"SQL_TSI_QUARTER", "SQL_TSI_MONTH",
+"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+
+  override private[flink] def children: Seq[Expression] = unit :: count :: 
timestamp :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isString(unit.resultType)) {
--- End diff --

1. Not only a string, but also should be a valid unit, i.e., must be YEAR 
MONTH...
2. Check count is a type of Integer or Long
3. Add ValidationTest, you can refer to the tests in 
`ScalarFunctionsValidationTest`


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-24 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197640444
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,71 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", 
"SQL_TSI_QUARTER", "SQL_TSI_MONTH",
+"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+
+  override private[flink] def children: Seq[Expression] = unit :: count :: 
timestamp :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isString(unit.resultType)) {
+  return ValidationFailure(s"TimestampAdd operator requires unit to be 
of type " +
+s"String Literal, but get ${unit.resultType}.")
+}
+if (!TypeCheckUtils.isTimePoint(timestamp.resultType)) {
+  return ValidationFailure(s"TimestampAdd operator requires timestamp 
to be of type " +
+s"SqlTimeTypeInfo, but get ${timestamp.resultType}.")
+}
+ValidationSuccess
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val timeUnit = unit match {
+  case Literal(value: String, STRING_TYPE_INFO) =>
+if (sqlTsiArray.contains(value)) {
+  Some(TimeUnit.valueOf(value.split("_").last))
+} else {
+  Some(TimeUnit.valueOf(value))
+}
+  case _ => None
+}
+
+val interval = count match {
+  case Literal(value: Int, INT_TYPE_INFO) =>
+makeInterval(value.toLong, timeUnit)
+  case Literal(value: Long, LONG_TYPE_INFO) =>
+makeInterval(value, timeUnit)
+  case _ =>
+relBuilder.call(SqlStdOperatorTable.MULTIPLY,
+  
relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier,
+new SqlIntervalQualifier(timeUnit.get, null, 
SqlParserPos.ZERO)),
+  count.toRexNode)
+}
+
+relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, 
timestamp.toRexNode, interval)
+  }
+
+  override def toString: String = s"timestampAdd(${children.mkString(", 
")})"
+
+  override private[flink] def resultType: TypeInformation[_] = 
STRING_TYPE_INFO
+
+  private[flink] def makeInterval(value: Long, timeUnit: Option[TimeUnit])
+(implicit relBuilder: RelBuilder) = {
+val countWithUnit = 
timeUnit.get.multiplier.multiply(java.math.BigDecimal.valueOf(value))
+  relBuilder.getRexBuilder.makeIntervalLiteral(countWithUnit,
--- End diff --

indent with two spaces


---


[jira] [Commented] (FLINK-6759) storm-examples cannot be built without cached dependencies

2018-06-24 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521486#comment-16521486
 ] 

Chesnay Schepler commented on FLINK-6759:
-

[~aitozi] flink-storm-examples should no longer have a dependency on 
flink-examples-batch, quite odd that you see that in 1.6.

In any case, you can compile the flink-examples or 
flink-examples/flink-examples-batch modules separately.

> storm-examples cannot be built without cached dependencies
> --
>
> Key: FLINK-6759
> URL: https://issues.apache.org/jira/browse/FLINK-6759
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.4.0
>
>
> The {{flink-storm-examples}} module fails to build if the 
> {{flink-examples-batch}} dependency is not present in the local cache.
> {code}
> [ERROR] Failed to execute goal on project flink-storm-examples_2.10: Could 
> not resolve dependencies for project 
> org.apache.flink:flink-storm-examples_2.10:jar:1.4-SNAPSHOT:
> Failed to collect dependenc
> ies at org.apache.flink:flink-examples-batch_2.10:jar:1.4-SNAPSHOT: Failed to 
> read artifact descriptor for 
> org.apache.flink:flink-examples-batch_2.10:jar:1.4-SNAPSHOT:
> Failure to find org.apache.flink
> :flink-examples_${scala.binary.version}:pom:1.4-SNAPSHOT in 
> https://repository.apache.org/snapshots was cached in the local repository,
> resolution will not be reattempted until the update interval of
> apache.snapshots has elapsed or updates are forced -> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5463) RocksDB.disposeInternal does not react to interrupts, blocks task cancellation

2018-06-24 Thread steven-qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521480#comment-16521480
 ] 

steven-qin edited comment on FLINK-5463 at 6/24/18 12:28 PM:
-

Hi,

    I also encountered this issue in flink 1.4.2,and our taskmanager was 
crashed.However I don't know how to process it? 

Stack trace:

2018-06-19 00:54:10,275 ERROR org.apache.flink.runtime.taskmanager.TaskManager -
 ==
 == FATAL ===
 ==
 A fatal error occurred, forcing the TaskManager to shut down: Task '' did 
not react to cancelling signal in the last 30 seconds, but is stuck in method:
 org.rocksdb.RocksDB.disposeInternal(Native Method)
 org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
 
org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
 org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:297)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:335)
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 java.lang.Thread.run(Thread.java:745)


was (Author: suizhe007):
Hi,

    I also encountered this issue in flink 1.4.2,and our taksmanager was 
crashed.However I don't know how to process it? 

Stack trace:

2018-06-19 00:54:10,275 ERROR org.apache.flink.runtime.taskmanager.TaskManager -
 ==
 == FATAL ===
 ==
 A fatal error occurred, forcing the TaskManager to shut down: Task '' did 
not react to cancelling signal in the last 30 seconds, but is stuck in method:
 org.rocksdb.RocksDB.disposeInternal(Native Method)
 org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
 
org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
 org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:297)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:335)
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 java.lang.Thread.run(Thread.java:745)

> RocksDB.disposeInternal does not react to interrupts, blocks task cancellation
> --
>
> Key: FLINK-5463
> URL: https://issues.apache.org/jira/browse/FLINK-5463
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Priority: Major
>
> I'm using Flink 699f4b0.
> My Flink job is slow while cancelling because RockDB seems to be busy with 
> disposing its state:
> {code}
> 2017-01-11 18:48:23,315 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Triggering cancellation of task code 
> TriggerWindow(TumblingEventTimeWindows(4), 
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071
> }, EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) 
> (1/1) (2accc6ca2727c4f7ec963318fbd237e9).
> 2017-01-11 18:48:53,318 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'TriggerWindow(TumblingEventTimeWindows(4), 
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071},
>  EventTimeTrigger(), Windowed
> Stream.apply(AllWindowedStream.java:440)) (1/1)' did not react to cancelling 
> signal, but is stuck in method:
>  org.rocksdb.RocksDB.disposeInternal(Native Method)
> org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
> org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250)
> 

[jira] [Comment Edited] (FLINK-5463) RocksDB.disposeInternal does not react to interrupts, blocks task cancellation

2018-06-24 Thread steven-qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521480#comment-16521480
 ] 

steven-qin edited comment on FLINK-5463 at 6/24/18 12:26 PM:
-

Hi,

    I also encountered this issue in flink 1.4.2,and our taksmanager was 
crashed.However I don't know how to process it? 

Stack trace:

2018-06-19 00:54:10,275 ERROR org.apache.flink.runtime.taskmanager.TaskManager -
 ==
 == FATAL ===
 ==
 A fatal error occurred, forcing the TaskManager to shut down: Task '' did 
not react to cancelling signal in the last 30 seconds, but is stuck in method:
 org.rocksdb.RocksDB.disposeInternal(Native Method)
 org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
 
org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
 org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:297)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:335)
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 java.lang.Thread.run(Thread.java:745)


was (Author: suizhe007):
Hi,

    I also encountered this issue in flink 1.4.2,but I don't know how to 
process it? 

Stack trace:

2018-06-19 00:54:10,275 ERROR org.apache.flink.runtime.taskmanager.TaskManager -
==
== FATAL ===
==
A fatal error occurred, forcing the TaskManager to shut down: Task '' did 
not react to cancelling signal in the last 30 seconds, but is stuck in method:
 org.rocksdb.RocksDB.disposeInternal(Native Method)
 org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
 
org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
 org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:297)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:335)
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 java.lang.Thread.run(Thread.java:745)

> RocksDB.disposeInternal does not react to interrupts, blocks task cancellation
> --
>
> Key: FLINK-5463
> URL: https://issues.apache.org/jira/browse/FLINK-5463
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Priority: Major
>
> I'm using Flink 699f4b0.
> My Flink job is slow while cancelling because RockDB seems to be busy with 
> disposing its state:
> {code}
> 2017-01-11 18:48:23,315 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Triggering cancellation of task code 
> TriggerWindow(TumblingEventTimeWindows(4), 
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071
> }, EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) 
> (1/1) (2accc6ca2727c4f7ec963318fbd237e9).
> 2017-01-11 18:48:53,318 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'TriggerWindow(TumblingEventTimeWindows(4), 
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071},
>  EventTimeTrigger(), Windowed
> Stream.apply(AllWindowedStream.java:440)) (1/1)' did not react to cancelling 
> signal, but is stuck in method:
>  org.rocksdb.RocksDB.disposeInternal(Native Method)
> org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
> org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250)
> 

[jira] [Commented] (FLINK-5463) RocksDB.disposeInternal does not react to interrupts, blocks task cancellation

2018-06-24 Thread steven-qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521480#comment-16521480
 ] 

steven-qin commented on FLINK-5463:
---

Hi,

    I also encountered this issue in flink 1.4.2,but I don't know how to 
process it? 

Stack trace:

2018-06-19 00:54:10,275 ERROR org.apache.flink.runtime.taskmanager.TaskManager -
==
== FATAL ===
==
A fatal error occurred, forcing the TaskManager to shut down: Task '' did 
not react to cancelling signal in the last 30 seconds, but is stuck in method:
 org.rocksdb.RocksDB.disposeInternal(Native Method)
 org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
 
org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
 org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:297)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:335)
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 java.lang.Thread.run(Thread.java:745)

> RocksDB.disposeInternal does not react to interrupts, blocks task cancellation
> --
>
> Key: FLINK-5463
> URL: https://issues.apache.org/jira/browse/FLINK-5463
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Priority: Major
>
> I'm using Flink 699f4b0.
> My Flink job is slow while cancelling because RockDB seems to be busy with 
> disposing its state:
> {code}
> 2017-01-11 18:48:23,315 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Triggering cancellation of task code 
> TriggerWindow(TumblingEventTimeWindows(4), 
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071
> }, EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) 
> (1/1) (2accc6ca2727c4f7ec963318fbd237e9).
> 2017-01-11 18:48:53,318 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'TriggerWindow(TumblingEventTimeWindows(4), 
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071},
>  EventTimeTrigger(), Windowed
> Stream.apply(AllWindowedStream.java:440)) (1/1)' did not react to cancelling 
> signal, but is stuck in method:
>  org.rocksdb.RocksDB.disposeInternal(Native Method)
> org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
> org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:331)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:169)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.dispose(WindowOperator.java:273)
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:439)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:340)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
> java.lang.Thread.run(Thread.java:745)
> 2017-01-11 18:48:53,319 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'TriggerWindow(TumblingEventTimeWindows(4), 
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071},
>  EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) (1/1)' 
> did not react to cancelling signal, but is stuck in method:
>  org.rocksdb.RocksDB.disposeInternal(Native Method)
> org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
> org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:331)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:169)
> 

[jira] [Created] (FLINK-9652) Metrics for upstream and downstream

2018-06-24 Thread godfrey johnson (JIRA)
godfrey johnson created FLINK-9652:
--

 Summary: Metrics for upstream and downstream
 Key: FLINK-9652
 URL: https://issues.apache.org/jira/browse/FLINK-9652
 Project: Flink
  Issue Type: Task
  Components: Metrics
Reporter: godfrey johnson


Flink can get metrics from source to the next operators,but can not get the 
total input/output metrics from web UI, which is inconvenient for user to get 
the records  status for job.
 # Upstream and downstream metrics is import for troubleshooting.
 # No source input metrics,  making many users considered that no kafka input 
at all, and Flink or Flink program might have some error. The same problem for 
the sink output metrics.
 # For some one operator job(just source and sink in the same operator chain), 
user cannot see any stream metrics, unless adds an keyBy operation, which will 
print the source output metrics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9603) Incorrect indexing of part files, when part suffix is specified (FileAlreadyExistsException)

2018-06-24 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-9603:
-

Assignee: Kostas Kloudas

> Incorrect indexing of part files, when part suffix is specified 
> (FileAlreadyExistsException)
> 
>
> Key: FLINK-9603
> URL: https://issues.apache.org/jira/browse/FLINK-9603
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.5.0
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
>   
>  During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
>  So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
>   
>  This problem is related with the following code:
>  *{color:#e32400}Here we are trying to find the max index of part file, that 
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
> involved into path assembly. This means, that path always doesn’t 
> exist{color}*
>  *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>   
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>   fs.exists(getPendingPathFor(partPath)) ||
>   fs.exists(getInProgressPathFor(partPath))) {
>bucketState.partCounter++;
>partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
> but it should be already appended, before index checks{color}*
> {code:java}
> if (partSuffix != null) {
>partPath = partPath.suffix(partSuffix);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521448#comment-16521448
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user xueyumusic commented on the issue:

https://github.com/apache/flink/pull/6188
  
Thanks very much for your reivew and guidance, @walterddr @hequn8128 , I 
have made some modification, please have a review when you are free, thanks!


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-24 Thread xueyumusic
Github user xueyumusic commented on the issue:

https://github.com/apache/flink/pull/6188
  
Thanks very much for your reivew and guidance, @walterddr @hequn8128 , I 
have made some modification, please have a review when you are free, thanks!


---


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-06-24 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
The travis error this time seems unrelated.


---


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521405#comment-16521405
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
The travis error this time seems unrelated.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9185) Potential null dereference in PrioritizedOperatorSubtaskState#resolvePrioritizedAlternatives

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521400#comment-16521400
 ] 

ASF GitHub Bot commented on FLINK-9185:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5894
  
Thanks for you contribution @StephenJeson ! Looks good to me from my side.


> Potential null dereference in 
> PrioritizedOperatorSubtaskState#resolvePrioritizedAlternatives
> 
>
> Key: FLINK-9185
> URL: https://issues.apache.org/jira/browse/FLINK-9185
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Stephen Jason
>Priority: Minor
>  Labels: pull-request-available
>
> {code}
> if (alternative != null
>   && alternative.hasState()
>   && alternative.size() == 1
>   && approveFun.apply(reference, alternative.iterator().next())) {
> {code}
> The return value from approveFun.apply would be unboxed.
> We should check that the return value is not null.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9185) Potential null dereference in PrioritizedOperatorSubtaskState#resolvePrioritizedAlternatives

2018-06-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9185:
--
Labels: pull-request-available  (was: )

> Potential null dereference in 
> PrioritizedOperatorSubtaskState#resolvePrioritizedAlternatives
> 
>
> Key: FLINK-9185
> URL: https://issues.apache.org/jira/browse/FLINK-9185
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Stephen Jason
>Priority: Minor
>  Labels: pull-request-available
>
> {code}
> if (alternative != null
>   && alternative.hasState()
>   && alternative.size() == 1
>   && approveFun.apply(reference, alternative.iterator().next())) {
> {code}
> The return value from approveFun.apply would be unboxed.
> We should check that the return value is not null.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5894: [FLINK-9185] [runtime] Fix potential null dereference in ...

2018-06-24 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5894
  
Thanks for you contribution @StephenJeson ! Looks good to me from my side.


---


[jira] [Commented] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521395#comment-16521395
 ] 

ASF GitHub Bot commented on FLINK-9585:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6175
  
Looks good from my side.


> Logger in ZooKeeperStateHandleStore is public and non-final
> ---
>
> Key: FLINK-9585
> URL: https://issues.apache.org/jira/browse/FLINK-9585
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Trivial
>  Labels: pull-request-available
>
> The logger in {{ZooKeeperStateHandleStore}} should be private and final.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final

2018-06-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9585:
--
Labels: pull-request-available  (was: )

> Logger in ZooKeeperStateHandleStore is public and non-final
> ---
>
> Key: FLINK-9585
> URL: https://issues.apache.org/jira/browse/FLINK-9585
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Trivial
>  Labels: pull-request-available
>
> The logger in {{ZooKeeperStateHandleStore}} should be private and final.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6175: [FLINK-9585] Logger in ZooKeeperStateHandleStore is publi...

2018-06-24 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6175
  
Looks good from my side.


---


[jira] [Commented] (FLINK-9627) Extending 'KafkaJsonTableSource' according to comments will result in NPE

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521391#comment-16521391
 ] 

ASF GitHub Bot commented on FLINK-9627:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6198
  
Looks good from my side


> Extending 'KafkaJsonTableSource' according to comments will result in NPE
> -
>
> Key: FLINK-9627
> URL: https://issues.apache.org/jira/browse/FLINK-9627
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Dominik Wosiński
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> According to the comments what is needed to extend the 'KafkaJsonTableSource' 
> looks as follows:
>  
> {code:java}
> A version-agnostic Kafka JSON {@link StreamTableSource}.
> *
> * The version-specific Kafka consumers need to extend this class and
> * override {@link #createKafkaConsumer(String, Properties, 
> DeserializationSchema)}}.
> *
> * The field names are used to parse the JSON file and so are the 
> types.{code}
> This will cause an NPE, since there is no default value for startupMode in 
> the abstract class itself only in the builder of this class. 
> For the 'getKafkaConsumer' method the switch statement will be executed on 
> non-initialized 'startupMode' field:
> {code:java}
> switch (startupMode) {
> case EARLIEST:
> kafkaConsumer.setStartFromEarliest();
> break;
> case LATEST:
> kafkaConsumer.setStartFromLatest();
> break;
> case GROUP_OFFSETS:
> kafkaConsumer.setStartFromGroupOffsets();
> break;
> case SPECIFIC_OFFSETS:
> kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
> break;
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6198: [FLINK-9627] Extending KafkaJsonTableSource according to ...

2018-06-24 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6198
  
Looks good from my side


---


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521386#comment-16521386
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629310
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
--- End diff --

fixed


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-24 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629310
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
--- End diff --

fixed


---


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-06-24 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Hi @zhangminglei ,  thanks for your review. I only check the 
SharedBufferTest locally before, the error in travis means the num of state 
access (read and write) is less than before which is the purpose of this pr, 
and I fix the error. cc @dawidwys 


---


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521384#comment-16521384
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Hi @zhangminglei ,  thanks for your review. I only check the 
SharedBufferTest locally before, the error in travis means the num of state 
access (read and write) is less than before which is the purpose of this pr, 
and I fix the error. cc @dawidwys 


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521383#comment-16521383
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629304
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
--- End diff --

Yes. Thanks let me know this.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-24 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629304
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
--- End diff --

Yes. Thanks let me know this.


---


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521380#comment-16521380
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629205
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
--- End diff --

it means `if and only if`.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-24 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629218
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -75,6 +76,9 @@
private MapState eventsCount;
private MapState> entries;
 
+   private HashMap> eventsBufferCache = new 
HashMap<>();
+   private HashMap> entryCache = new 
HashMap<>();
--- End diff --

agree and fixed


---


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521381#comment-16521381
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629218
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -75,6 +76,9 @@
private MapState eventsCount;
private MapState> entries;
 
+   private HashMap> eventsBufferCache = new 
HashMap<>();
+   private HashMap> entryCache = new 
HashMap<>();
--- End diff --

agree and fixed


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)