[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521828#comment-16521828 ] ASF GitHub Bot commented on FLINK-6846: --- Github user xueyumusic commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197674277 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -349,6 +349,17 @@ case class TimestampAdd( if (!TypeCheckUtils.isString(unit.resultType)) { return ValidationFailure(s"TimestampAdd operator requires unit to be of type " + s"String Literal, but get ${unit.resultType}.") +} else { + val unitStr = unit.toString() + if (!sqlTsiArray.contains(unitStr) && +!sqlTsiArray.map(item => item.split("_").last).contains(unitStr)) { + return ValidationFailure(s"TimestampAdd operator requires unit to be one of (YEAR, " + --- End diff -- fix this, add ")", thank you! > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user xueyumusic commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197674277 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -349,6 +349,17 @@ case class TimestampAdd( if (!TypeCheckUtils.isString(unit.resultType)) { return ValidationFailure(s"TimestampAdd operator requires unit to be of type " + s"String Literal, but get ${unit.resultType}.") +} else { + val unitStr = unit.toString() + if (!sqlTsiArray.contains(unitStr) && +!sqlTsiArray.map(item => item.split("_").last).contains(unitStr)) { + return ValidationFailure(s"TimestampAdd operator requires unit to be one of (YEAR, " + --- End diff -- fix this, add ")", thank you! ---
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521801#comment-16521801 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 Maybe let me elaborate the TTL checking condition in detail, overall the checking condition contains two parts and looks like `(current_ts - update_ts) - time_shift_offset >= TTL`. The `time_shift_offset` is the shift offset that we should applied when checking the TTL. - For the records that the `update_ts` > `checkpoint_ts`, we could know they were created(or updated) after the last restoring so we don't need to apply any shift to it. So that shift offset is `0`. - For the records that the `update_ts` <= `checkpoint_ts`, we could know they were created(or updated) before the last restoring so we need to apply the shift to it, the shift offset is `recovery_ts - checkpoint_ts`. In our current code, we didn't do the time-align works, it equals to a special case of the above condition where the `time_shift_offset` is always `0`. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 Maybe let me elaborate the TTL checking condition in detail, overall the checking condition contains two parts and looks like `(current_ts - update_ts) - time_shift_offset >= TTL`. The `time_shift_offset` is the shift offset that we should applied when checking the TTL. - For the records that the `update_ts` > `checkpoint_ts`, we could know they were created(or updated) after the last restoring so we don't need to apply any shift to it. So that shift offset is `0`. - For the records that the `update_ts` <= `checkpoint_ts`, we could know they were created(or updated) before the last restoring so we need to apply the shift to it, the shift offset is `recovery_ts - checkpoint_ts`. In our current code, we didn't do the time-align works, it equals to a special case of the above condition where the `time_shift_offset` is always `0`. ---
[jira] [Assigned] (FLINK-9653) Add operator name to latency metrics
[ https://issues.apache.org/jira/browse/FLINK-9653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9653: --- Assignee: vinoyang > Add operator name to latency metrics > > > Key: FLINK-9653 > URL: https://issues.apache.org/jira/browse/FLINK-9653 > Project: Flink > Issue Type: New Feature > Components: Metrics > Environment: All >Reporter: Julian Stephen >Assignee: vinoyang >Priority: Minor > > Currently the latency metrics reports latency between subtasks using this > format: > flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency{ > host="",instance="",job="", > job_id="",job_name="",operator_id="",operator_subtask_index="0", > quantile="0.99",source_id="",source_subtask_index="0",tm_id="" > The request is to add {{operator_name}} along with {{operator_id}} to the > metric labels. > For a simple job, (e.g., {{source->map->sink) }} you can see two sets of > latency metrics. Each set shows all quantiles like (.5, .95..). Only thing > different between the two sets is the {{operator_id}}. This makes sense > assuming one {{operator_id}} belongs to the {{map}} operator and the other > belongs to the {{sink}}. > Now the problem is that is no intuitive way to distinguish between the two > (find out which operator_id is the {{map vs sink}}), just by looking at the > metrics. > Assigning names to {{map}} and {{sink}} operator does not help. Even though > these names show up in other metrics like {{numRecordsIn}}, the names does > not show up in the latency metric. > The feature request is to add {{operator_name}} along with {{operator_id}} to > the metric label so that it can be easily used in dashboards and tracking. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521762#comment-16521762 ] ASF GitHub Bot commented on FLINK-6846: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197662959 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -349,6 +349,17 @@ case class TimestampAdd( if (!TypeCheckUtils.isString(unit.resultType)) { return ValidationFailure(s"TimestampAdd operator requires unit to be of type " + s"String Literal, but get ${unit.resultType}.") +} else { + val unitStr = unit.toString() + if (!sqlTsiArray.contains(unitStr) && +!sqlTsiArray.map(item => item.split("_").last).contains(unitStr)) { + return ValidationFailure(s"TimestampAdd operator requires unit to be one of (YEAR, " + --- End diff -- Remove the "(" from "(YEAR", or add a ")" behind > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197662959 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -349,6 +349,17 @@ case class TimestampAdd( if (!TypeCheckUtils.isString(unit.resultType)) { return ValidationFailure(s"TimestampAdd operator requires unit to be of type " + s"String Literal, but get ${unit.resultType}.") +} else { + val unitStr = unit.toString() + if (!sqlTsiArray.contains(unitStr) && +!sqlTsiArray.map(item => item.split("_").last).contains(unitStr)) { + return ValidationFailure(s"TimestampAdd operator requires unit to be one of (YEAR, " + --- End diff -- Remove the "(" from "(YEAR", or add a ")" behind ---
[jira] [Updated] (FLINK-6557) RocksDBKeyedStateBackend broken on Windows
[ https://issues.apache.org/jira/browse/FLINK-6557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-6557: -- Labels: pull-request-available (was: ) > RocksDBKeyedStateBackend broken on Windows > -- > > Key: FLINK-6557 > URL: https://issues.apache.org/jira/browse/FLINK-6557 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.3.0, 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.5.0 > > > The {{RocksDBKeyedStateBackend}} cannot be used on Windows. We pass the > result of {{org.apache.flink.core.fs.Path#getPath()}} to RocksDB which will > fail when trying to create the checkpoint file as the path begins with a > slash which isn't a valid Windows path: > {code} > /C:/Users/Zento/AppData/Local/Temp/junit572330160893758355/junit5754599533651878867/job-ecbdb9df76fd3a39108dac7c515e3214_op-Test_uuid-6a43f1f6-1f35-443e-945c-aab3643e62fc/chk-0.tmp > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6557) RocksDBKeyedStateBackend broken on Windows
[ https://issues.apache.org/jira/browse/FLINK-6557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521748#comment-16521748 ] ASF GitHub Bot commented on FLINK-6557: --- Github user vschafer closed the pull request at: https://github.com/apache/flink/pull/4269 > RocksDBKeyedStateBackend broken on Windows > -- > > Key: FLINK-6557 > URL: https://issues.apache.org/jira/browse/FLINK-6557 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.3.0, 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.5.0 > > > The {{RocksDBKeyedStateBackend}} cannot be used on Windows. We pass the > result of {{org.apache.flink.core.fs.Path#getPath()}} to RocksDB which will > fail when trying to create the checkpoint file as the path begins with a > slash which isn't a valid Windows path: > {code} > /C:/Users/Zento/AppData/Local/Temp/junit572330160893758355/junit5754599533651878867/job-ecbdb9df76fd3a39108dac7c515e3214_op-Test_uuid-6a43f1f6-1f35-443e-945c-aab3643e62fc/chk-0.tmp > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4269: FLINK-6557 Fixing tests for RocksDB state backend ...
Github user vschafer closed the pull request at: https://github.com/apache/flink/pull/4269 ---
[jira] [Created] (FLINK-9653) Add operator name to latency metrics
Julian Stephen created FLINK-9653: - Summary: Add operator name to latency metrics Key: FLINK-9653 URL: https://issues.apache.org/jira/browse/FLINK-9653 Project: Flink Issue Type: New Feature Components: Metrics Environment: All Reporter: Julian Stephen Currently the latency metrics reports latency between subtasks using this format: flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency{ host="",instance="",job="", job_id="",job_name="",operator_id="",operator_subtask_index="0", quantile="0.99",source_id="",source_subtask_index="0",tm_id="" The request is to add {{operator_name}} along with {{operator_id}} to the metric labels. For a simple job, (e.g., {{source->map->sink) }} you can see two sets of latency metrics. Each set shows all quantiles like (.5, .95..). Only thing different between the two sets is the {{operator_id}}. This makes sense assuming one {{operator_id}} belongs to the {{map}} operator and the other belongs to the {{sink}}. Now the problem is that is no intuitive way to distinguish between the two (find out which operator_id is the {{map vs sink}}), just by looking at the metrics. Assigning names to {{map}} and {{sink}} operator does not help. Even though these names show up in other metrics like {{numRecordsIn}}, the names does not show up in the latency metric. The feature request is to add {{operator_name}} along with {{operator_id}} to the metric label so that it can be easily used in dashboards and tracking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9380) Failing end-to-end tests should not clean up logs
[ https://issues.apache.org/jira/browse/FLINK-9380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521570#comment-16521570 ] Deepak Sharma commented on FLINK-9380: -- Hi [~till.rohrmann], would you still like me to work on this? If so, please let me know what you think about the question above. > Failing end-to-end tests should not clean up logs > - > > Key: FLINK-9380 > URL: https://issues.apache.org/jira/browse/FLINK-9380 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Deepak Sharma >Priority: Critical > Labels: test-stability > Fix For: 1.6.0, 1.5.1 > > > Some of the end-to-end tests clean up their logs also in the failure case. > This makes debugging and understanding the problem extremely difficult. > Ideally, the scripts says where it stored the respective logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9563) Migrate integration tests for CEP
[ https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9563: -- Labels: pull-request-available (was: ) > Migrate integration tests for CEP > - > > Key: FLINK-9563 > URL: https://issues.apache.org/jira/browse/FLINK-9563 > Project: Flink > Issue Type: Sub-task >Reporter: Deepak Sharma >Assignee: Deepak Sharma >Priority: Minor > Labels: pull-request-available > > Covers all integration tests under > apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP
[ https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521568#comment-16521568 ] ASF GitHub Bot commented on FLINK-9563: --- Github user deepaks4077 commented on the issue: https://github.com/apache/flink/pull/6170 @zentol, do you have some time this week to review this? I can proceed with the other tests once I know that this is the right way to handle it :) > Migrate integration tests for CEP > - > > Key: FLINK-9563 > URL: https://issues.apache.org/jira/browse/FLINK-9563 > Project: Flink > Issue Type: Sub-task >Reporter: Deepak Sharma >Assignee: Deepak Sharma >Priority: Minor > Labels: pull-request-available > > Covers all integration tests under > apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...
Github user deepaks4077 commented on the issue: https://github.com/apache/flink/pull/6170 @zentol, do you have some time this week to review this? I can proceed with the other tests once I know that this is the right way to handle it :) ---
[jira] [Closed] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final
[ https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou closed FLINK-9585. - Resolution: Fixed Merged in: master: 5fa61d8ceac8f865002dc0ef84dc1a3c65753d0b > Logger in ZooKeeperStateHandleStore is public and non-final > --- > > Key: FLINK-9585 > URL: https://issues.apache.org/jira/browse/FLINK-9585 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Trivial > Labels: pull-request-available > > The logger in {{ZooKeeperStateHandleStore}} should be private and final. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final
[ https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521532#comment-16521532 ] ASF GitHub Bot commented on FLINK-9585: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6175 > Logger in ZooKeeperStateHandleStore is public and non-final > --- > > Key: FLINK-9585 > URL: https://issues.apache.org/jira/browse/FLINK-9585 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Trivial > Labels: pull-request-available > > The logger in {{ZooKeeperStateHandleStore}} should be private and final. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6175: [FLINK-9585] Logger in ZooKeeperStateHandleStore i...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6175 ---
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521509#comment-16521509 ] ASF GitHub Bot commented on FLINK-6846: --- Github user xueyumusic commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197642129 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,71 @@ case class TemporalOverlaps( } } + /** +* Standard conversion of the TIMESTAMPADD operator. +* Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] +*/ +case class TimestampAdd( +unit: Expression, +count: Expression, +timestamp: Expression) + extends Expression { + + private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", "SQL_TSI_MONTH", +"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", "SQL_TSI_SECOND") + + override private[flink] def children: Seq[Expression] = unit :: count :: timestamp :: Nil + + override private[flink] def validateInput(): ValidationResult = { +if (!TypeCheckUtils.isString(unit.resultType)) { --- End diff -- valid input more strict, and add validation test, thank you > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user xueyumusic commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197642129 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,71 @@ case class TemporalOverlaps( } } + /** +* Standard conversion of the TIMESTAMPADD operator. +* Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] +*/ +case class TimestampAdd( +unit: Expression, +count: Expression, +timestamp: Expression) + extends Expression { + + private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", "SQL_TSI_MONTH", +"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", "SQL_TSI_SECOND") + + override private[flink] def children: Seq[Expression] = unit :: count :: timestamp :: Nil + + override private[flink] def validateInput(): ValidationResult = { +if (!TypeCheckUtils.isString(unit.resultType)) { --- End diff -- valid input more strict, and add validation test, thank you ---
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521498#comment-16521498 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 I'm still a bit worried about the time-align problem on recovery(because I've met serval case that would become disaster on production if we don't do the time-align on recovery. (One instance: we used the DFS to store the checkpoint data, and the DFS went into safe-mode because of some problems, we took several hours to notice that and also took some times to address the issue. After addressing DFS's issue, user's jobs were resumed and begin to run correctly. In this case, if we don't do the time-align on recovery, then user's state maybe already totally expired(when TTL <= the `system down time`)). I had a second thought on this problem, and I think maybe we could do that without a full scanning of the records, the approach is outlined below. - 1. We need to remember the timestamp when performing the checkpoint, let's say it `checkpoint_ts`. - 2. We also need to remember the timestamp when recovering from the checkpoint, let's say it `recovery_ts`. - 3. For each record, we remember it's last update timestamp, let's say it `update_ts`. - 5. And the current time stamp is `current_ts`. - 4. Then we could use the follow condition `checkpoint_ts - update_ts + current_s - recovery_ts >= TTL` to check whether the record is expired. If it's true then record is expired, otherwise the record is still alive. What do you think? @azagrebin , and @StefanRRichter would be really nice to learn your opinion about this problem. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 I'm still a bit worried about the time-align problem on recovery(because I've met serval case that would become disaster on production if we don't do the time-align on recovery. (One instance: we used the DFS to store the checkpoint data, and the DFS went into safe-mode because of some problems, we took several hours to notice that and also took some times to address the issue. After addressing DFS's issue, user's jobs were resumed and begin to run correctly. In this case, if we don't do the time-align on recovery, then user's state maybe already totally expired(when TTL <= the `system down time`)). I had a second thought on this problem, and I think maybe we could do that without a full scanning of the records, the approach is outlined below. - 1. We need to remember the timestamp when performing the checkpoint, let's say it `checkpoint_ts`. - 2. We also need to remember the timestamp when recovering from the checkpoint, let's say it `recovery_ts`. - 3. For each record, we remember it's last update timestamp, let's say it `update_ts`. - 5. And the current time stamp is `current_ts`. - 4. Then we could use the follow condition `checkpoint_ts - update_ts + current_s - recovery_ts >= TTL` to check whether the record is expired. If it's true then record is expired, otherwise the record is still alive. What do you think? @azagrebin , and @StefanRRichter would be really nice to learn your opinion about this problem. ---
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521496#comment-16521496 ] ASF GitHub Bot commented on FLINK-6846: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197640444 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,71 @@ case class TemporalOverlaps( } } + /** +* Standard conversion of the TIMESTAMPADD operator. +* Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] +*/ +case class TimestampAdd( +unit: Expression, +count: Expression, +timestamp: Expression) + extends Expression { + + private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", "SQL_TSI_MONTH", +"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", "SQL_TSI_SECOND") + + override private[flink] def children: Seq[Expression] = unit :: count :: timestamp :: Nil + + override private[flink] def validateInput(): ValidationResult = { +if (!TypeCheckUtils.isString(unit.resultType)) { + return ValidationFailure(s"TimestampAdd operator requires unit to be of type " + +s"String Literal, but get ${unit.resultType}.") +} +if (!TypeCheckUtils.isTimePoint(timestamp.resultType)) { + return ValidationFailure(s"TimestampAdd operator requires timestamp to be of type " + +s"SqlTimeTypeInfo, but get ${timestamp.resultType}.") +} +ValidationSuccess + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +val timeUnit = unit match { + case Literal(value: String, STRING_TYPE_INFO) => +if (sqlTsiArray.contains(value)) { + Some(TimeUnit.valueOf(value.split("_").last)) +} else { + Some(TimeUnit.valueOf(value)) +} + case _ => None +} + +val interval = count match { + case Literal(value: Int, INT_TYPE_INFO) => +makeInterval(value.toLong, timeUnit) + case Literal(value: Long, LONG_TYPE_INFO) => +makeInterval(value, timeUnit) + case _ => +relBuilder.call(SqlStdOperatorTable.MULTIPLY, + relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier, +new SqlIntervalQualifier(timeUnit.get, null, SqlParserPos.ZERO)), + count.toRexNode) +} + +relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, timestamp.toRexNode, interval) + } + + override def toString: String = s"timestampAdd(${children.mkString(", ")})" + + override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO + + private[flink] def makeInterval(value: Long, timeUnit: Option[TimeUnit]) +(implicit relBuilder: RelBuilder) = { +val countWithUnit = timeUnit.get.multiplier.multiply(java.math.BigDecimal.valueOf(value)) + relBuilder.getRexBuilder.makeIntervalLiteral(countWithUnit, --- End diff -- indent with two spaces > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521495#comment-16521495 ] ASF GitHub Bot commented on FLINK-6846: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197640442 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,71 @@ case class TemporalOverlaps( } } + /** +* Standard conversion of the TIMESTAMPADD operator. +* Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] +*/ +case class TimestampAdd( +unit: Expression, +count: Expression, +timestamp: Expression) + extends Expression { + + private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", "SQL_TSI_MONTH", +"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", "SQL_TSI_SECOND") + + override private[flink] def children: Seq[Expression] = unit :: count :: timestamp :: Nil + + override private[flink] def validateInput(): ValidationResult = { +if (!TypeCheckUtils.isString(unit.resultType)) { --- End diff -- 1. Not only a string, but also should be a valid unit, i.e., must be YEAR MONTH... 2. Check count is a type of Integer or Long 3. Add ValidationTest, you can refer to the tests in `ScalarFunctionsValidationTest` > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197640442 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,71 @@ case class TemporalOverlaps( } } + /** +* Standard conversion of the TIMESTAMPADD operator. +* Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] +*/ +case class TimestampAdd( +unit: Expression, +count: Expression, +timestamp: Expression) + extends Expression { + + private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", "SQL_TSI_MONTH", +"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", "SQL_TSI_SECOND") + + override private[flink] def children: Seq[Expression] = unit :: count :: timestamp :: Nil + + override private[flink] def validateInput(): ValidationResult = { +if (!TypeCheckUtils.isString(unit.resultType)) { --- End diff -- 1. Not only a string, but also should be a valid unit, i.e., must be YEAR MONTH... 2. Check count is a type of Integer or Long 3. Add ValidationTest, you can refer to the tests in `ScalarFunctionsValidationTest` ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197640444 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,71 @@ case class TemporalOverlaps( } } + /** +* Standard conversion of the TIMESTAMPADD operator. +* Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] +*/ +case class TimestampAdd( +unit: Expression, +count: Expression, +timestamp: Expression) + extends Expression { + + private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", "SQL_TSI_MONTH", +"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", "SQL_TSI_SECOND") + + override private[flink] def children: Seq[Expression] = unit :: count :: timestamp :: Nil + + override private[flink] def validateInput(): ValidationResult = { +if (!TypeCheckUtils.isString(unit.resultType)) { + return ValidationFailure(s"TimestampAdd operator requires unit to be of type " + +s"String Literal, but get ${unit.resultType}.") +} +if (!TypeCheckUtils.isTimePoint(timestamp.resultType)) { + return ValidationFailure(s"TimestampAdd operator requires timestamp to be of type " + +s"SqlTimeTypeInfo, but get ${timestamp.resultType}.") +} +ValidationSuccess + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +val timeUnit = unit match { + case Literal(value: String, STRING_TYPE_INFO) => +if (sqlTsiArray.contains(value)) { + Some(TimeUnit.valueOf(value.split("_").last)) +} else { + Some(TimeUnit.valueOf(value)) +} + case _ => None +} + +val interval = count match { + case Literal(value: Int, INT_TYPE_INFO) => +makeInterval(value.toLong, timeUnit) + case Literal(value: Long, LONG_TYPE_INFO) => +makeInterval(value, timeUnit) + case _ => +relBuilder.call(SqlStdOperatorTable.MULTIPLY, + relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier, +new SqlIntervalQualifier(timeUnit.get, null, SqlParserPos.ZERO)), + count.toRexNode) +} + +relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, timestamp.toRexNode, interval) + } + + override def toString: String = s"timestampAdd(${children.mkString(", ")})" + + override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO + + private[flink] def makeInterval(value: Long, timeUnit: Option[TimeUnit]) +(implicit relBuilder: RelBuilder) = { +val countWithUnit = timeUnit.get.multiplier.multiply(java.math.BigDecimal.valueOf(value)) + relBuilder.getRexBuilder.makeIntervalLiteral(countWithUnit, --- End diff -- indent with two spaces ---
[jira] [Commented] (FLINK-6759) storm-examples cannot be built without cached dependencies
[ https://issues.apache.org/jira/browse/FLINK-6759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521486#comment-16521486 ] Chesnay Schepler commented on FLINK-6759: - [~aitozi] flink-storm-examples should no longer have a dependency on flink-examples-batch, quite odd that you see that in 1.6. In any case, you can compile the flink-examples or flink-examples/flink-examples-batch modules separately. > storm-examples cannot be built without cached dependencies > -- > > Key: FLINK-6759 > URL: https://issues.apache.org/jira/browse/FLINK-6759 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Priority: Major > Fix For: 1.4.0 > > > The {{flink-storm-examples}} module fails to build if the > {{flink-examples-batch}} dependency is not present in the local cache. > {code} > [ERROR] Failed to execute goal on project flink-storm-examples_2.10: Could > not resolve dependencies for project > org.apache.flink:flink-storm-examples_2.10:jar:1.4-SNAPSHOT: > Failed to collect dependenc > ies at org.apache.flink:flink-examples-batch_2.10:jar:1.4-SNAPSHOT: Failed to > read artifact descriptor for > org.apache.flink:flink-examples-batch_2.10:jar:1.4-SNAPSHOT: > Failure to find org.apache.flink > :flink-examples_${scala.binary.version}:pom:1.4-SNAPSHOT in > https://repository.apache.org/snapshots was cached in the local repository, > resolution will not be reattempted until the update interval of > apache.snapshots has elapsed or updates are forced -> [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5463) RocksDB.disposeInternal does not react to interrupts, blocks task cancellation
[ https://issues.apache.org/jira/browse/FLINK-5463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521480#comment-16521480 ] steven-qin edited comment on FLINK-5463 at 6/24/18 12:28 PM: - Hi, I also encountered this issue in flink 1.4.2,and our taskmanager was crashed.However I don't know how to process it? Stack trace: 2018-06-19 00:54:10,275 ERROR org.apache.flink.runtime.taskmanager.TaskManager - == == FATAL === == A fatal error occurred, forcing the TaskManager to shut down: Task '' did not react to cancelling signal in the last 30 seconds, but is stuck in method: org.rocksdb.RocksDB.disposeInternal(Native Method) org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37) org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56) org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:297) org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:335) org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114) org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) java.lang.Thread.run(Thread.java:745) was (Author: suizhe007): Hi, I also encountered this issue in flink 1.4.2,and our taksmanager was crashed.However I don't know how to process it? Stack trace: 2018-06-19 00:54:10,275 ERROR org.apache.flink.runtime.taskmanager.TaskManager - == == FATAL === == A fatal error occurred, forcing the TaskManager to shut down: Task '' did not react to cancelling signal in the last 30 seconds, but is stuck in method: org.rocksdb.RocksDB.disposeInternal(Native Method) org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37) org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56) org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:297) org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:335) org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114) org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) java.lang.Thread.run(Thread.java:745) > RocksDB.disposeInternal does not react to interrupts, blocks task cancellation > -- > > Key: FLINK-5463 > URL: https://issues.apache.org/jira/browse/FLINK-5463 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Priority: Major > > I'm using Flink 699f4b0. > My Flink job is slow while cancelling because RockDB seems to be busy with > disposing its state: > {code} > 2017-01-11 18:48:23,315 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code > TriggerWindow(TumblingEventTimeWindows(4), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071 > }, EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) > (1/1) (2accc6ca2727c4f7ec963318fbd237e9). > 2017-01-11 18:48:53,318 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'TriggerWindow(TumblingEventTimeWindows(4), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071}, > EventTimeTrigger(), Windowed > Stream.apply(AllWindowedStream.java:440)) (1/1)' did not react to cancelling > signal, but is stuck in method: > org.rocksdb.RocksDB.disposeInternal(Native Method) > org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37) > org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56) > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250) >
[jira] [Comment Edited] (FLINK-5463) RocksDB.disposeInternal does not react to interrupts, blocks task cancellation
[ https://issues.apache.org/jira/browse/FLINK-5463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521480#comment-16521480 ] steven-qin edited comment on FLINK-5463 at 6/24/18 12:26 PM: - Hi, I also encountered this issue in flink 1.4.2,and our taksmanager was crashed.However I don't know how to process it? Stack trace: 2018-06-19 00:54:10,275 ERROR org.apache.flink.runtime.taskmanager.TaskManager - == == FATAL === == A fatal error occurred, forcing the TaskManager to shut down: Task '' did not react to cancelling signal in the last 30 seconds, but is stuck in method: org.rocksdb.RocksDB.disposeInternal(Native Method) org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37) org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56) org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:297) org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:335) org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114) org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) java.lang.Thread.run(Thread.java:745) was (Author: suizhe007): Hi, I also encountered this issue in flink 1.4.2,but I don't know how to process it? Stack trace: 2018-06-19 00:54:10,275 ERROR org.apache.flink.runtime.taskmanager.TaskManager - == == FATAL === == A fatal error occurred, forcing the TaskManager to shut down: Task '' did not react to cancelling signal in the last 30 seconds, but is stuck in method: org.rocksdb.RocksDB.disposeInternal(Native Method) org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37) org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56) org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:297) org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:335) org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114) org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) java.lang.Thread.run(Thread.java:745) > RocksDB.disposeInternal does not react to interrupts, blocks task cancellation > -- > > Key: FLINK-5463 > URL: https://issues.apache.org/jira/browse/FLINK-5463 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Priority: Major > > I'm using Flink 699f4b0. > My Flink job is slow while cancelling because RockDB seems to be busy with > disposing its state: > {code} > 2017-01-11 18:48:23,315 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code > TriggerWindow(TumblingEventTimeWindows(4), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071 > }, EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) > (1/1) (2accc6ca2727c4f7ec963318fbd237e9). > 2017-01-11 18:48:53,318 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'TriggerWindow(TumblingEventTimeWindows(4), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071}, > EventTimeTrigger(), Windowed > Stream.apply(AllWindowedStream.java:440)) (1/1)' did not react to cancelling > signal, but is stuck in method: > org.rocksdb.RocksDB.disposeInternal(Native Method) > org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37) > org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56) > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250) >
[jira] [Commented] (FLINK-5463) RocksDB.disposeInternal does not react to interrupts, blocks task cancellation
[ https://issues.apache.org/jira/browse/FLINK-5463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521480#comment-16521480 ] steven-qin commented on FLINK-5463: --- Hi, I also encountered this issue in flink 1.4.2,but I don't know how to process it? Stack trace: 2018-06-19 00:54:10,275 ERROR org.apache.flink.runtime.taskmanager.TaskManager - == == FATAL === == A fatal error occurred, forcing the TaskManager to shut down: Task '' did not react to cancelling signal in the last 30 seconds, but is stuck in method: org.rocksdb.RocksDB.disposeInternal(Native Method) org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37) org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56) org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:297) org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:335) org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114) org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) java.lang.Thread.run(Thread.java:745) > RocksDB.disposeInternal does not react to interrupts, blocks task cancellation > -- > > Key: FLINK-5463 > URL: https://issues.apache.org/jira/browse/FLINK-5463 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Priority: Major > > I'm using Flink 699f4b0. > My Flink job is slow while cancelling because RockDB seems to be busy with > disposing its state: > {code} > 2017-01-11 18:48:23,315 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code > TriggerWindow(TumblingEventTimeWindows(4), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071 > }, EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) > (1/1) (2accc6ca2727c4f7ec963318fbd237e9). > 2017-01-11 18:48:53,318 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'TriggerWindow(TumblingEventTimeWindows(4), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071}, > EventTimeTrigger(), Windowed > Stream.apply(AllWindowedStream.java:440)) (1/1)' did not react to cancelling > signal, but is stuck in method: > org.rocksdb.RocksDB.disposeInternal(Native Method) > org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37) > org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56) > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:331) > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:169) > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.dispose(WindowOperator.java:273) > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:439) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:340) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) > java.lang.Thread.run(Thread.java:745) > 2017-01-11 18:48:53,319 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'TriggerWindow(TumblingEventTimeWindows(4), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071}, > EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) (1/1)' > did not react to cancelling signal, but is stuck in method: > org.rocksdb.RocksDB.disposeInternal(Native Method) > org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37) > org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56) > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:331) > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:169) >
[jira] [Created] (FLINK-9652) Metrics for upstream and downstream
godfrey johnson created FLINK-9652: -- Summary: Metrics for upstream and downstream Key: FLINK-9652 URL: https://issues.apache.org/jira/browse/FLINK-9652 Project: Flink Issue Type: Task Components: Metrics Reporter: godfrey johnson Flink can get metrics from source to the next operators,but can not get the total input/output metrics from web UI, which is inconvenient for user to get the records status for job. # Upstream and downstream metrics is import for troubleshooting. # No source input metrics, making many users considered that no kafka input at all, and Flink or Flink program might have some error. The same problem for the sink output metrics. # For some one operator job(just source and sink in the same operator chain), user cannot see any stream metrics, unless adds an keyBy operation, which will print the source output metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9603) Incorrect indexing of part files, when part suffix is specified (FileAlreadyExistsException)
[ https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-9603: - Assignee: Kostas Kloudas > Incorrect indexing of part files, when part suffix is specified > (FileAlreadyExistsException) > > > Key: FLINK-9603 > URL: https://issues.apache.org/jira/browse/FLINK-9603 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.5.0 >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > > Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of > the part file. It’s very useful, when it’s necessary to set specific > extension of the file. > > During the usage, I’ve found the issue - when new part file is created, it > has the same part index, as index of just closed file. > So, when Flink tries to move it into final state, we have a > FileAlreadyExistsException. > > This problem is related with the following code: > *{color:#e32400}Here we are trying to find the max index of part file, that > doesn’t exist in bucket directory, the problem is, that the partSuffix is not > involved into path assembly. This means, that path always doesn’t > exist{color}* > *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}* > > {code:java} > Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + > bucketState.partCounter); > while (fs.exists(partPath) || > fs.exists(getPendingPathFor(partPath)) || > fs.exists(getInProgressPathFor(partPath))) { >bucketState.partCounter++; >partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + > bucketState.partCounter); > } > bucketState.creationTime = processingTimeService.getCurrentProcessingTime(); > {code} > *{color:#e32400}Before creating of writer, we appending the partSuffix here, > but it should be already appended, before index checks{color}* > {code:java} > if (partSuffix != null) { >partPath = partPath.suffix(partSuffix); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521448#comment-16521448 ] ASF GitHub Bot commented on FLINK-6846: --- Github user xueyumusic commented on the issue: https://github.com/apache/flink/pull/6188 Thanks very much for your reivew and guidance, @walterddr @hequn8128 , I have made some modification, please have a review when you are free, thanks! > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user xueyumusic commented on the issue: https://github.com/apache/flink/pull/6188 Thanks very much for your reivew and guidance, @walterddr @hequn8128 , I have made some modification, please have a review when you are free, thanks! ---
[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 The travis error this time seems unrelated. ---
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521405#comment-16521405 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 The travis error this time seems unrelated. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9185) Potential null dereference in PrioritizedOperatorSubtaskState#resolvePrioritizedAlternatives
[ https://issues.apache.org/jira/browse/FLINK-9185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521400#comment-16521400 ] ASF GitHub Bot commented on FLINK-9185: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5894 Thanks for you contribution @StephenJeson ! Looks good to me from my side. > Potential null dereference in > PrioritizedOperatorSubtaskState#resolvePrioritizedAlternatives > > > Key: FLINK-9185 > URL: https://issues.apache.org/jira/browse/FLINK-9185 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Stephen Jason >Priority: Minor > Labels: pull-request-available > > {code} > if (alternative != null > && alternative.hasState() > && alternative.size() == 1 > && approveFun.apply(reference, alternative.iterator().next())) { > {code} > The return value from approveFun.apply would be unboxed. > We should check that the return value is not null. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9185) Potential null dereference in PrioritizedOperatorSubtaskState#resolvePrioritizedAlternatives
[ https://issues.apache.org/jira/browse/FLINK-9185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9185: -- Labels: pull-request-available (was: ) > Potential null dereference in > PrioritizedOperatorSubtaskState#resolvePrioritizedAlternatives > > > Key: FLINK-9185 > URL: https://issues.apache.org/jira/browse/FLINK-9185 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Stephen Jason >Priority: Minor > Labels: pull-request-available > > {code} > if (alternative != null > && alternative.hasState() > && alternative.size() == 1 > && approveFun.apply(reference, alternative.iterator().next())) { > {code} > The return value from approveFun.apply would be unboxed. > We should check that the return value is not null. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5894: [FLINK-9185] [runtime] Fix potential null dereference in ...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5894 Thanks for you contribution @StephenJeson ! Looks good to me from my side. ---
[jira] [Commented] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final
[ https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521395#comment-16521395 ] ASF GitHub Bot commented on FLINK-9585: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6175 Looks good from my side. > Logger in ZooKeeperStateHandleStore is public and non-final > --- > > Key: FLINK-9585 > URL: https://issues.apache.org/jira/browse/FLINK-9585 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Trivial > Labels: pull-request-available > > The logger in {{ZooKeeperStateHandleStore}} should be private and final. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9585) Logger in ZooKeeperStateHandleStore is public and non-final
[ https://issues.apache.org/jira/browse/FLINK-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9585: -- Labels: pull-request-available (was: ) > Logger in ZooKeeperStateHandleStore is public and non-final > --- > > Key: FLINK-9585 > URL: https://issues.apache.org/jira/browse/FLINK-9585 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Trivial > Labels: pull-request-available > > The logger in {{ZooKeeperStateHandleStore}} should be private and final. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6175: [FLINK-9585] Logger in ZooKeeperStateHandleStore is publi...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6175 Looks good from my side. ---
[jira] [Commented] (FLINK-9627) Extending 'KafkaJsonTableSource' according to comments will result in NPE
[ https://issues.apache.org/jira/browse/FLINK-9627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521391#comment-16521391 ] ASF GitHub Bot commented on FLINK-9627: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6198 Looks good from my side > Extending 'KafkaJsonTableSource' according to comments will result in NPE > - > > Key: FLINK-9627 > URL: https://issues.apache.org/jira/browse/FLINK-9627 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Dominik Wosiński >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > According to the comments what is needed to extend the 'KafkaJsonTableSource' > looks as follows: > > {code:java} > A version-agnostic Kafka JSON {@link StreamTableSource}. > * > * The version-specific Kafka consumers need to extend this class and > * override {@link #createKafkaConsumer(String, Properties, > DeserializationSchema)}}. > * > * The field names are used to parse the JSON file and so are the > types.{code} > This will cause an NPE, since there is no default value for startupMode in > the abstract class itself only in the builder of this class. > For the 'getKafkaConsumer' method the switch statement will be executed on > non-initialized 'startupMode' field: > {code:java} > switch (startupMode) { > case EARLIEST: > kafkaConsumer.setStartFromEarliest(); > break; > case LATEST: > kafkaConsumer.setStartFromLatest(); > break; > case GROUP_OFFSETS: > kafkaConsumer.setStartFromGroupOffsets(); > break; > case SPECIFIC_OFFSETS: > kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets); > break; > }{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6198: [FLINK-9627] Extending KafkaJsonTableSource according to ...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6198 Looks good from my side ---
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521386#comment-16521386 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629310 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event --- End diff -- fixed > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629310 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event --- End diff -- fixed ---
[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Hi @zhangminglei , thanks for your review. I only check the SharedBufferTest locally before, the error in travis means the num of state access (read and write) is less than before which is the purpose of this pr, and I fix the error. cc @dawidwys ---
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521384#comment-16521384 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Hi @zhangminglei , thanks for your review. I only check the SharedBufferTest locally before, the error in travis means the num of state access (read and write) is less than before which is the purpose of this pr, and I fix the error. cc @dawidwys > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521383#comment-16521383 ] ASF GitHub Bot commented on FLINK-9642: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629304 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event --- End diff -- Yes. Thanks let me know this. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629304 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event --- End diff -- Yes. Thanks let me know this. ---
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521380#comment-16521380 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629205 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event --- End diff -- it means `if and only if`. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629218 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -75,6 +76,9 @@ private MapState eventsCount; private MapState> entries; + private HashMap> eventsBufferCache = new HashMap<>(); + private HashMap> entryCache = new HashMap<>(); --- End diff -- agree and fixed ---
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521381#comment-16521381 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629218 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -75,6 +76,9 @@ private MapState eventsCount; private MapState> entries; + private HashMap> eventsBufferCache = new HashMap<>(); + private HashMap> entryCache = new HashMap<>(); --- End diff -- agree and fixed > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)