[jira] [Commented] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance
[ https://issues.apache.org/jira/browse/FLINK-9869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561354#comment-16561354 ] ASF GitHub Bot commented on FLINK-9869: --- TisonKun commented on issue #6345: [FLINK-9869][runtime] Send PartitionInfo in batch to Improve perfornance URL: https://github.com/apache/flink/pull/6345#issuecomment-408727224 @tillrohrmann so i am here again. please review when you are free, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Send PartitionInfo in batch to Improve perfornance > -- > > Key: FLINK-9869 > URL: https://issues.apache.org/jira/browse/FLINK-9869 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.5.1 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > ... current we send partition info as soon as one arrive. we could > `cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve > performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance
[ https://issues.apache.org/jira/browse/FLINK-9869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561355#comment-16561355 ] ASF GitHub Bot commented on FLINK-9869: --- TisonKun edited a comment on issue #6345: [FLINK-9869][runtime] Send PartitionInfo in batch to Improve perfornance URL: https://github.com/apache/flink/pull/6345#issuecomment-408727224 @tillrohrmann so i am here again and do the rebase. please review when you are free, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Send PartitionInfo in batch to Improve perfornance > -- > > Key: FLINK-9869 > URL: https://issues.apache.org/jira/browse/FLINK-9869 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.5.1 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > ... current we send partition info as soon as one arrive. we could > `cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve > performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6345: [FLINK-9869][runtime] Send PartitionInfo in batch to Improve perfornance
TisonKun commented on issue #6345: [FLINK-9869][runtime] Send PartitionInfo in batch to Improve perfornance URL: https://github.com/apache/flink/pull/6345#issuecomment-408727224 @tillrohrmann so i am here again. please review when you are free, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun edited a comment on issue #6345: [FLINK-9869][runtime] Send PartitionInfo in batch to Improve perfornance
TisonKun edited a comment on issue #6345: [FLINK-9869][runtime] Send PartitionInfo in batch to Improve perfornance URL: https://github.com/apache/flink/pull/6345#issuecomment-408727224 @tillrohrmann so i am here again and do the rebase. please review when you are free, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9859) Distinguish TM akka config with JM config
[ https://issues.apache.org/jira/browse/FLINK-9859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561342#comment-16561342 ] ASF GitHub Bot commented on FLINK-9859: --- TisonKun commented on issue #6339: [FLINK-9859] [runtime] distinguish JM akka config URL: https://github.com/apache/flink/pull/6339#issuecomment-408725883 cc @tillrohrmann @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Distinguish TM akka config with JM config > - > > Key: FLINK-9859 > URL: https://issues.apache.org/jira/browse/FLINK-9859 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.5.1 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > ... increase the number of akka threads on JM, to improve its performance; > decrease the number of akka threads on TM, to save resource. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6339: [FLINK-9859] [runtime] distinguish JM akka config
TisonKun commented on issue #6339: [FLINK-9859] [runtime] distinguish JM akka config URL: https://github.com/apache/flink/pull/6339#issuecomment-408725883 cc @tillrohrmann @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9859) Distinguish TM akka config with JM config
[ https://issues.apache.org/jira/browse/FLINK-9859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561341#comment-16561341 ] ASF GitHub Bot commented on FLINK-9859: --- TisonKun removed a comment on issue #6339: [FLINK-9859] [runtime] distinguish JM akka config URL: https://github.com/apache/flink/pull/6339#issuecomment-405818056 cc @StephanEwen @sihuazhou This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Distinguish TM akka config with JM config > - > > Key: FLINK-9859 > URL: https://issues.apache.org/jira/browse/FLINK-9859 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.5.1 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > ... increase the number of akka threads on JM, to improve its performance; > decrease the number of akka threads on TM, to save resource. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun removed a comment on issue #6339: [FLINK-9859] [runtime] distinguish JM akka config
TisonKun removed a comment on issue #6339: [FLINK-9859] [runtime] distinguish JM akka config URL: https://github.com/apache/flink/pull/6339#issuecomment-405818056 cc @StephanEwen @sihuazhou This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9991) Add regex_replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561299#comment-16561299 ] ASF GitHub Bot commented on FLINK-9991: --- yanghua commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#discussion_r205991262 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -450,6 +450,39 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testRegexReplace(): Unit = { +testAllApis( + "foobar".regexReplace("oo|ar", ""), + "'foobar'.regexReplace('oo|ar', '')", + "regex_replace('foobar', 'oo|ar', '')", + "fb") + +testAllApis( + "foobar".regexReplace("oo|ar", "abc"), + "'foobar'.regexReplace('oo|ar', 'abc')", + "regex_replace('foobar', 'oo|ar', 'abc')", + "fabcbabc") + +testAllApis( + 'f33.regexReplace("oo|ar", ""), + "f33.regexReplace('oo|ar', '')", + "REGEX_REPLACE(f33, 'oo|ar', '')", + "null") + +testAllApis( + "foobar".regexReplace('f33, ""), + "'foobar'.regexReplace(f33, '')", + "REGEX_REPLACE('foobar', f33, '')", + "null") + +testAllApis( + "foobar".regexReplace("oo|ar", 'f33), + "'foobar'.regexReplace('oo|ar', f33)", + "REGEX_REPLACE('foobar', 'oo|ar', f33)", + "null") + } + Review comment: Accepted this test case. It should return 'for', the result based on the regular expression. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add regex_replace supported in TableAPI and SQL > --- > > Key: FLINK-9991 > URL: https://issues.apache.org/jira/browse/FLINK-9991 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regexp_replace is a very userful function to process String. > For example : > {code:java} > regexp_replace("foobar", "oo|ar", "") //returns 'fb.' > {code} > It is supported as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL
yanghua commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#discussion_r205991262 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -450,6 +450,39 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testRegexReplace(): Unit = { +testAllApis( + "foobar".regexReplace("oo|ar", ""), + "'foobar'.regexReplace('oo|ar', '')", + "regex_replace('foobar', 'oo|ar', '')", + "fb") + +testAllApis( + "foobar".regexReplace("oo|ar", "abc"), + "'foobar'.regexReplace('oo|ar', 'abc')", + "regex_replace('foobar', 'oo|ar', 'abc')", + "fabcbabc") + +testAllApis( + 'f33.regexReplace("oo|ar", ""), + "f33.regexReplace('oo|ar', '')", + "REGEX_REPLACE(f33, 'oo|ar', '')", + "null") + +testAllApis( + "foobar".regexReplace('f33, ""), + "'foobar'.regexReplace(f33, '')", + "REGEX_REPLACE('foobar', f33, '')", + "null") + +testAllApis( + "foobar".regexReplace("oo|ar", 'f33), + "'foobar'.regexReplace('oo|ar', f33)", + "REGEX_REPLACE('foobar', 'oo|ar', f33)", + "null") + } + Review comment: Accepted this test case. It should return 'for', the result based on the regular expression. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9637) Add public user documentation for TTL feature
[ https://issues.apache.org/jira/browse/FLINK-9637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561297#comment-16561297 ] ASF GitHub Bot commented on FLINK-9637: --- eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205991168 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build(); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build() +{% endhighlight %} + + + +It has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is a value of time-to-live itself. + +The update type configures when the time-to-live of state value is prolonged (default `OnCreateAndWrite`): + + - `StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite` - only on creation and write access, + - `StateTtlConfiguration.TtlUpdateType.OnReadAndWrite` - also on read access. + +The state visibility configures whether the expired value is returned on read access +if it is not cleaned up yet (default `NeverReturnExpired`): + + - `StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired` - expired value is never returned, + - `StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available. + +The TTL can be enabled in descriptor for any type of state: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration.newBuilder(Time.seconds(1)).build(); +ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text state", String.class); +stateDescriptor.enableTimeToLive(ttlConfig); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration.newBuilder(Time.seconds(1)).build() +val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String]) +stateDescriptor.enableTimeToLive(ttlConfig) +{% endhighlight %} + + + +**Notes:** + +- The state backends store the timestamp of last modification along with the user value, +which means that enabling this feature increases consumption of state storage. + +- As of current implementation the state storage is cleaned up of expired value +only on its explicit read access per key, e.g. calling `ValueState.value()`. +This might change in future releases, e.g. additional strategies might be added in background to speed up cleanup. + +- Only *processing time* scale is currently supported for TTL. Review comment: Only TTLs in reference to *processing time* are currently supported. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add public user documentation for TTL feature > - > > Key: FLINK-9637 > URL: https://issues.apache.org/jira/browse/FLINK-9637 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205991168 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build(); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build() +{% endhighlight %} + + + +It has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is a value of time-to-live itself. + +The update type configures when the time-to-live of state value is prolonged (default `OnCreateAndWrite`): + + - `StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite` - only on creation and write access, + - `StateTtlConfiguration.TtlUpdateType.OnReadAndWrite` - also on read access. + +The state visibility configures whether the expired value is returned on read access +if it is not cleaned up yet (default `NeverReturnExpired`): + + - `StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired` - expired value is never returned, + - `StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available. + +The TTL can be enabled in descriptor for any type of state: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration.newBuilder(Time.seconds(1)).build(); +ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text state", String.class); +stateDescriptor.enableTimeToLive(ttlConfig); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration.newBuilder(Time.seconds(1)).build() +val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String]) +stateDescriptor.enableTimeToLive(ttlConfig) +{% endhighlight %} + + + +**Notes:** + +- The state backends store the timestamp of last modification along with the user value, +which means that enabling this feature increases consumption of state storage. + +- As of current implementation the state storage is cleaned up of expired value +only on its explicit read access per key, e.g. calling `ValueState.value()`. +This might change in future releases, e.g. additional strategies might be added in background to speed up cleanup. + +- Only *processing time* scale is currently supported for TTL. Review comment: Only TTLs in reference to *processing time* are currently supported. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9637) Add public user documentation for TTL feature
[ https://issues.apache.org/jira/browse/FLINK-9637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561296#comment-16561296 ] ASF GitHub Bot commented on FLINK-9637: --- eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205991075 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build(); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build() +{% endhighlight %} + + + +It has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is a value of time-to-live itself. + +The update type configures when the time-to-live of state value is prolonged (default `OnCreateAndWrite`): + + - `StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite` - only on creation and write access, + - `StateTtlConfiguration.TtlUpdateType.OnReadAndWrite` - also on read access. + +The state visibility configures whether the expired value is returned on read access +if it is not cleaned up yet (default `NeverReturnExpired`): + + - `StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired` - expired value is never returned, + - `StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available. + +The TTL can be enabled in descriptor for any type of state: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration.newBuilder(Time.seconds(1)).build(); +ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text state", String.class); +stateDescriptor.enableTimeToLive(ttlConfig); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration.newBuilder(Time.seconds(1)).build() +val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String]) +stateDescriptor.enableTimeToLive(ttlConfig) +{% endhighlight %} + + + +**Notes:** + +- The state backends store the timestamp of last modification along with the user value, +which means that enabling this feature increases consumption of state storage. + +- As of current implementation the state storage is cleaned up of expired value +only on its explicit read access per key, e.g. calling `ValueState.value()`. +This might change in future releases, e.g. additional strategies might be added in background to speed up cleanup. Review comment: In the current implementation, expired values are only removed when they are read explicitly, e.g. by calling `ValueState.value()`. Note that this means that under the current implementation if expired state is not read, it won't be removed, possibly leading to ever growing state. This might change in future releases. Additional strategies might be added that clean up expired state in the background. BTW, is this true given FLINK-9938? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add public user documentation for TTL feature > - > > Key: FLINK-9637 > URL: https://issues.apache.org/jira/browse/FLINK-9637 > Project: Flink > Issue Type: Sub-task > Components: State Backends,
[GitHub] eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205991075 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build(); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build() +{% endhighlight %} + + + +It has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is a value of time-to-live itself. + +The update type configures when the time-to-live of state value is prolonged (default `OnCreateAndWrite`): + + - `StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite` - only on creation and write access, + - `StateTtlConfiguration.TtlUpdateType.OnReadAndWrite` - also on read access. + +The state visibility configures whether the expired value is returned on read access +if it is not cleaned up yet (default `NeverReturnExpired`): + + - `StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired` - expired value is never returned, + - `StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available. + +The TTL can be enabled in descriptor for any type of state: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration.newBuilder(Time.seconds(1)).build(); +ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text state", String.class); +stateDescriptor.enableTimeToLive(ttlConfig); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration.newBuilder(Time.seconds(1)).build() +val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String]) +stateDescriptor.enableTimeToLive(ttlConfig) +{% endhighlight %} + + + +**Notes:** + +- The state backends store the timestamp of last modification along with the user value, +which means that enabling this feature increases consumption of state storage. + +- As of current implementation the state storage is cleaned up of expired value +only on its explicit read access per key, e.g. calling `ValueState.value()`. +This might change in future releases, e.g. additional strategies might be added in background to speed up cleanup. Review comment: In the current implementation, expired values are only removed when they are read explicitly, e.g. by calling `ValueState.value()`. Note that this means that under the current implementation if expired state is not read, it won't be removed, possibly leading to ever growing state. This might change in future releases. Additional strategies might be added that clean up expired state in the background. BTW, is this true given FLINK-9938? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9991) Add regex_replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561295#comment-16561295 ] ASF GitHub Bot commented on FLINK-9991: --- yanghua commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#discussion_r205990913 ## File path: docs/dev/table/tableApi.md ## @@ -4025,6 +4038,18 @@ STRING.initCap() + + +{% highlight scala %} +STRING.regexReplace(regex STRING, replacement STRING) +{% endhighlight %} + + + +Returns the string resulting from replacing all substrings that match the regex with replacement, if string or regex or replacement is NULL, returns NULL. E.g. "foobar".regexReplace("oo|ar", "") returns "fb". + + + Review comment: No, it's scala doc. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add regex_replace supported in TableAPI and SQL > --- > > Key: FLINK-9991 > URL: https://issues.apache.org/jira/browse/FLINK-9991 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regexp_replace is a very userful function to process String. > For example : > {code:java} > regexp_replace("foobar", "oo|ar", "") //returns 'fb.' > {code} > It is supported as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL
yanghua commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#discussion_r205990913 ## File path: docs/dev/table/tableApi.md ## @@ -4025,6 +4038,18 @@ STRING.initCap() + + +{% highlight scala %} +STRING.regexReplace(regex STRING, replacement STRING) +{% endhighlight %} + + + +Returns the string resulting from replacing all substrings that match the regex with replacement, if string or regex or replacement is NULL, returns NULL. E.g. "foobar".regexReplace("oo|ar", "") returns "fb". + + + Review comment: No, it's scala doc. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9637) Add public user documentation for TTL feature
[ https://issues.apache.org/jira/browse/FLINK-9637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561294#comment-16561294 ] ASF GitHub Bot commented on FLINK-9637: --- eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990823 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build(); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build() +{% endhighlight %} + + + +It has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is a value of time-to-live itself. + +The update type configures when the time-to-live of state value is prolonged (default `OnCreateAndWrite`): + + - `StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite` - only on creation and write access, + - `StateTtlConfiguration.TtlUpdateType.OnReadAndWrite` - also on read access. + +The state visibility configures whether the expired value is returned on read access +if it is not cleaned up yet (default `NeverReturnExpired`): + + - `StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired` - expired value is never returned, + - `StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available. + +The TTL can be enabled in descriptor for any type of state: Review comment: TTL functionality can be enabled in the descriptor of any type of state: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add public user documentation for TTL feature > - > > Key: FLINK-9637 > URL: https://issues.apache.org/jira/browse/FLINK-9637 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990823 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build(); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build() +{% endhighlight %} + + + +It has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is a value of time-to-live itself. + +The update type configures when the time-to-live of state value is prolonged (default `OnCreateAndWrite`): + + - `StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite` - only on creation and write access, + - `StateTtlConfiguration.TtlUpdateType.OnReadAndWrite` - also on read access. + +The state visibility configures whether the expired value is returned on read access +if it is not cleaned up yet (default `NeverReturnExpired`): + + - `StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired` - expired value is never returned, + - `StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available. + +The TTL can be enabled in descriptor for any type of state: Review comment: TTL functionality can be enabled in the descriptor of any type of state: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9637) Add public user documentation for TTL feature
[ https://issues.apache.org/jira/browse/FLINK-9637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561293#comment-16561293 ] ASF GitHub Bot commented on FLINK-9637: --- eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990785 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build(); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build() +{% endhighlight %} + + + +It has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is a value of time-to-live itself. + +The update type configures when the time-to-live of state value is prolonged (default `OnCreateAndWrite`): Review comment: The update type configures when the state TTL is refreshed (default `OnCreateAndWrite`): This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add public user documentation for TTL feature > - > > Key: FLINK-9637 > URL: https://issues.apache.org/jira/browse/FLINK-9637 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990785 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build(); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build() +{% endhighlight %} + + + +It has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is a value of time-to-live itself. + +The update type configures when the time-to-live of state value is prolonged (default `OnCreateAndWrite`): Review comment: The update type configures when the state TTL is refreshed (default `OnCreateAndWrite`): This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9637) Add public user documentation for TTL feature
[ https://issues.apache.org/jira/browse/FLINK-9637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561292#comment-16561292 ] ASF GitHub Bot commented on FLINK-9637: --- eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990730 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build(); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build() +{% endhighlight %} + + + +It has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is a value of time-to-live itself. Review comment: The first parameter of `newBuilder` method is mandatory, it is the time-to-live value. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add public user documentation for TTL feature > - > > Key: FLINK-9637 > URL: https://issues.apache.org/jira/browse/FLINK-9637 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990730 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: + + + +{% highlight java %} +StateTtlConfiguration ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build(); +{% endhighlight %} + + + +{% highlight scala %} +val ttlConfig = StateTtlConfiguration +.newBuilder(Time.seconds(1)) +.setTtlUpdateType(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired) +.build() +{% endhighlight %} + + + +It has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is a value of time-to-live itself. Review comment: The first parameter of `newBuilder` method is mandatory, it is the time-to-live value. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990700 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: Review comment: To use state TTL you must first build a `StateTtlConfiguration` object: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9637) Add public user documentation for TTL feature
[ https://issues.apache.org/jira/browse/FLINK-9637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561291#comment-16561291 ] ASF GitHub Bot commented on FLINK-9637: --- eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990700 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. + +The behaviour of state with TTL firstly should be configured by building `StateTtlConfiguration`: Review comment: To use state TTL you must first build a `StateTtlConfiguration` object: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add public user documentation for TTL feature > - > > Key: FLINK-9637 > URL: https://issues.apache.org/jira/browse/FLINK-9637 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9637) Add public user documentation for TTL feature
[ https://issues.apache.org/jira/browse/FLINK-9637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561290#comment-16561290 ] ASF GitHub Bot commented on FLINK-9637: --- eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990622 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. Review comment: The state collection types support per-entry TTLs: list elements and map entries expire independently. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add public user documentation for TTL feature > - > > Key: FLINK-9637 > URL: https://issues.apache.org/jira/browse/FLINK-9637 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990622 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. + +The collection types of state support TTL on entry level: +separate list elements and map entries expire independently. Review comment: The state collection types support per-entry TTLs: list elements and map entries expire independently. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9637) Add public user documentation for TTL feature
[ https://issues.apache.org/jira/browse/FLINK-9637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561288#comment-16561288 ] ASF GitHub Bot commented on FLINK-9637: --- eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990412 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. Review comment: s/based on the best effort/on a best effort basis/ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add public user documentation for TTL feature > - > > Key: FLINK-9637 > URL: https://issues.apache.org/jira/browse/FLINK-9637 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9637) Add public user documentation for TTL feature
[ https://issues.apache.org/jira/browse/FLINK-9637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561289#comment-16561289 ] ASF GitHub Bot commented on FLINK-9637: --- eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990514 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. Review comment: s/even if it is not cleaned up yet/even if it has yet to be removed/ s/In this case it behaves as if it does not exist any more/In that case, it behaves as if it no longer exists/ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add public user documentation for TTL feature > - > > Key: FLINK-9637 > URL: https://issues.apache.org/jira/browse/FLINK-9637 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990514 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. +Depending on configuration, the expired state can become unavailable for read access +even if it is not cleaned up yet. In this case it behaves as if it does not exist any more. Review comment: s/even if it is not cleaned up yet/even if it has yet to be removed/ s/In this case it behaves as if it does not exist any more/In that case, it behaves as if it no longer exists/ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
eliaslevy commented on a change in pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#discussion_r205990412 ## File path: docs/dev/stream/state/state.md ## @@ -266,6 +266,92 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state value. +In this case it will expire after the configured TTL +and its stored value will be cleaned up based on the best effort. Review comment: s/based on the best effort/on a best effort basis/ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9991) Add regex_replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561287#comment-16561287 ] ASF GitHub Bot commented on FLINK-9991: --- TisonKun commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#discussion_r205989801 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -450,6 +450,39 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testRegexReplace(): Unit = { +testAllApis( + "foobar".regexReplace("oo|ar", ""), + "'foobar'.regexReplace('oo|ar', '')", + "regex_replace('foobar', 'oo|ar', '')", + "fb") + +testAllApis( + "foobar".regexReplace("oo|ar", "abc"), + "'foobar'.regexReplace('oo|ar', 'abc')", + "regex_replace('foobar', 'oo|ar', 'abc')", + "fabcbabc") + +testAllApis( + 'f33.regexReplace("oo|ar", ""), + "f33.regexReplace('oo|ar', '')", + "REGEX_REPLACE(f33, 'oo|ar', '')", + "null") + +testAllApis( + "foobar".regexReplace('f33, ""), + "'foobar'.regexReplace(f33, '')", + "REGEX_REPLACE('foobar', f33, '')", + "null") + +testAllApis( + "foobar".regexReplace("oo|ar", 'f33), + "'foobar'.regexReplace('oo|ar', f33)", + "REGEX_REPLACE('foobar', 'oo|ar', f33)", + "null") + } + Review comment: what about `"fooor".regexReplace("oo", "")`? is it `"for"` or `"fr"`? the problem is what about the complicated/corner case, which maybe worth a test to cover it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add regex_replace supported in TableAPI and SQL > --- > > Key: FLINK-9991 > URL: https://issues.apache.org/jira/browse/FLINK-9991 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regexp_replace is a very userful function to process String. > For example : > {code:java} > regexp_replace("foobar", "oo|ar", "") //returns 'fb.' > {code} > It is supported as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL
TisonKun commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#discussion_r205989801 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -450,6 +450,39 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testRegexReplace(): Unit = { +testAllApis( + "foobar".regexReplace("oo|ar", ""), + "'foobar'.regexReplace('oo|ar', '')", + "regex_replace('foobar', 'oo|ar', '')", + "fb") + +testAllApis( + "foobar".regexReplace("oo|ar", "abc"), + "'foobar'.regexReplace('oo|ar', 'abc')", + "regex_replace('foobar', 'oo|ar', 'abc')", + "fabcbabc") + +testAllApis( + 'f33.regexReplace("oo|ar", ""), + "f33.regexReplace('oo|ar', '')", + "REGEX_REPLACE(f33, 'oo|ar', '')", + "null") + +testAllApis( + "foobar".regexReplace('f33, ""), + "'foobar'.regexReplace(f33, '')", + "REGEX_REPLACE('foobar', f33, '')", + "null") + +testAllApis( + "foobar".regexReplace("oo|ar", 'f33), + "'foobar'.regexReplace('oo|ar', f33)", + "REGEX_REPLACE('foobar', 'oo|ar', f33)", + "null") + } + Review comment: what about `"fooor".regexReplace("oo", "")`? is it `"for"` or `"fr"`? the problem is what about the complicated/corner case, which maybe worth a test to cover it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9991) Add regex_replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561286#comment-16561286 ] ASF GitHub Bot commented on FLINK-9991: --- TisonKun commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#discussion_r205989736 ## File path: docs/dev/table/tableApi.md ## @@ -4025,6 +4038,18 @@ STRING.initCap() + + +{% highlight scala %} +STRING.regexReplace(regex STRING, replacement STRING) +{% endhighlight %} + + + +Returns the string resulting from replacing all substrings that match the regex with replacement, if string or regex or replacement is NULL, returns NULL. E.g. "foobar".regexReplace("oo|ar", "") returns "fb". + + + Review comment: duplicated? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add regex_replace supported in TableAPI and SQL > --- > > Key: FLINK-9991 > URL: https://issues.apache.org/jira/browse/FLINK-9991 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regexp_replace is a very userful function to process String. > For example : > {code:java} > regexp_replace("foobar", "oo|ar", "") //returns 'fb.' > {code} > It is supported as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL
TisonKun commented on a change in pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#discussion_r205989736 ## File path: docs/dev/table/tableApi.md ## @@ -4025,6 +4038,18 @@ STRING.initCap() + + +{% highlight scala %} +STRING.regexReplace(regex STRING, replacement STRING) +{% endhighlight %} + + + +Returns the string resulting from replacing all substrings that match the regex with replacement, if string or regex or replacement is NULL, returns NULL. E.g. "foobar".regexReplace("oo|ar", "") returns "fb". + + + Review comment: duplicated? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9240) Avoid deprecated akka methods
[ https://issues.apache.org/jira/browse/FLINK-9240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561282#comment-16561282 ] ASF GitHub Bot commented on FLINK-9240: --- TisonKun commented on issue #6446: [FLINK-9240] Avoid deprecated Akka methods URL: https://github.com/apache/flink/pull/6446#issuecomment-408710531 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid deprecated akka methods > - > > Key: FLINK-9240 > URL: https://issues.apache.org/jira/browse/FLINK-9240 > Project: Flink > Issue Type: Improvement > Components: Client, Local Runtime, Mesos, Tests, Web Client, YARN >Affects Versions: 1.6.0 >Reporter: Arnout Engelen >Priority: Minor > Labels: pull-request-available > > Several Akka functions that are widely-used in Flink have been deprecated for > a long time, such as ActorSystem#shutdown() and > ActorSystem.awaitTermination(). It would be nice to update those to use > non-deprecated functions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6446: [FLINK-9240] Avoid deprecated Akka methods
TisonKun commented on issue #6446: [FLINK-9240] Avoid deprecated Akka methods URL: https://github.com/apache/flink/pull/6446#issuecomment-408710531 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9981) Tune performance of RocksDB implementation
[ https://issues.apache.org/jira/browse/FLINK-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561260#comment-16561260 ] ASF GitHub Bot commented on FLINK-9981: --- StefanRRichter commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation URL: https://github.com/apache/flink/pull/6438#discussion_r205985966 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ## @@ -116,7 +127,14 @@ public RocksDBOrderedSetStore( @Override public void add(@Nonnull T element) { + byte[] elementBytes = serializeElement(element); + + if (LEXICOGRAPIC_BYTE_COMPARATOR.compare(elementBytes, lowerBoundSeekKey) < 0) { + // a smaller element means a new lower bound. + lowerBoundSeekKey = elementBytes; Review comment: Yes, please notice that the lower bound is also changed from requesting an iterator, because that is the point when we can get to know the element without additional costs. Then it is set to the actual current low key. Afterwards, smaller keys can be inserted again, and that is why this check is required. Maybe keys get deleted again, that is why it can only be a lower bound without getting to expensive, but from my experiments small change gave a huge improvement because it helps ignoring tombstones pre-compactions, and there are often many ahead of the first existing element because of the typical access pattern of this state. We might chose another initial value for the lower bound than the prefix, but I would not introduce `null` and additional branching. Thinking about it, the prefix of the next key-group might be actually a sensible initial value, but in the end it also doesn't matter to much because it is properly "calibrated" when an iterator is requested for the first time This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tune performance of RocksDB implementation > -- > > Key: FLINK-9981 > URL: https://issues.apache.org/jira/browse/FLINK-9981 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > General performance tuning/polishing for the RocksDB implementation. We can > figure out how caching/seeking can be improved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9981) Tune performance of RocksDB implementation
[ https://issues.apache.org/jira/browse/FLINK-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561259#comment-16561259 ] ASF GitHub Bot commented on FLINK-9981: --- StefanRRichter commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation URL: https://github.com/apache/flink/pull/6438#discussion_r205985966 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ## @@ -116,7 +127,14 @@ public RocksDBOrderedSetStore( @Override public void add(@Nonnull T element) { + byte[] elementBytes = serializeElement(element); + + if (LEXICOGRAPIC_BYTE_COMPARATOR.compare(elementBytes, lowerBoundSeekKey) < 0) { + // a smaller element means a new lower bound. + lowerBoundSeekKey = elementBytes; Review comment: Yes, please notice that the lower bound is also changed from requesting an iterator. Then it is set to the actual current low key. Afterwards, smaller keys can be inserted again, and that is why this check is required. Maybe keys get deleted again, that is why it can only be a lower bound without getting to expensive, but from my experiments small change gave a huge improvement because it helps ignoring tombstones pre-compactions, and there are often many ahead of the first existing element because of the typical access pattern of this state. We might chose another initial value for the lower bound than the prefix, but I would not introduce `null` and additional branching. Thinking about it, the prefix of the next key-group might be actually a sensible initial value, but in the end it also doesn't matter to much because it is properly "calibrated" when an iterator is requested for the first time This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tune performance of RocksDB implementation > -- > > Key: FLINK-9981 > URL: https://issues.apache.org/jira/browse/FLINK-9981 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > General performance tuning/polishing for the RocksDB implementation. We can > figure out how caching/seeking can be improved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation
StefanRRichter commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation URL: https://github.com/apache/flink/pull/6438#discussion_r205985966 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ## @@ -116,7 +127,14 @@ public RocksDBOrderedSetStore( @Override public void add(@Nonnull T element) { + byte[] elementBytes = serializeElement(element); + + if (LEXICOGRAPIC_BYTE_COMPARATOR.compare(elementBytes, lowerBoundSeekKey) < 0) { + // a smaller element means a new lower bound. + lowerBoundSeekKey = elementBytes; Review comment: Yes, please notice that the lower bound is also changed from requesting an iterator. Then it is set to the actual current low key. Afterwards, smaller keys can be inserted again, and that is why this check is required. Maybe keys get deleted again, that is why it can only be a lower bound without getting to expensive, but from my experiments small change gave a huge improvement because it helps ignoring tombstones pre-compactions, and there are often many ahead of the first existing element because of the typical access pattern of this state. We might chose another initial value for the lower bound than the prefix, but I would not introduce `null` and additional branching. Thinking about it, the prefix of the next key-group might be actually a sensible initial value, but in the end it also doesn't matter to much because it is properly "calibrated" when an iterator is requested for the first time This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-9994) IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.
[ https://issues.apache.org/jira/browse/FLINK-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-9994. - Resolution: Fixed Merged on master with dc780303e1e4420033949049e4d9368a6d230d88 and on release-1.6 with 850983ec3a869f158cc900b9de6435e47779e3f9 > IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp. > > > Key: FLINK-9994 > URL: https://issues.apache.org/jira/browse/FLINK-9994 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-4407) Add DSL for specifying Window Triggers
[ https://issues.apache.org/jira/browse/FLINK-4407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-4407: -- Labels: pull-request-available (was: ) > Add DSL for specifying Window Triggers > -- > > Key: FLINK-4407 > URL: https://issues.apache.org/jira/browse/FLINK-4407 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > This issue refers to the implementation of the trigger DSL. > The specification of the DSL has an open FLIP here: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-9%3A+Trigger+DSL > And is currently under discussion in the dev@ mailing list here: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-9-Trigger-DSL-td13065.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9994) IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.
[ https://issues.apache.org/jira/browse/FLINK-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561226#comment-16561226 ] ASF GitHub Bot commented on FLINK-9994: --- asfgit closed pull request #6449: [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp. URL: https://github.com/apache/flink/pull/6449 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java index 0c449e64f41..43085cb42c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java @@ -152,6 +152,7 @@ public IntervalJoinOperator( @Override public void open() throws Exception { super.open(); + collector = new TimestampedCollector<>(output); context = new ContextImpl(userFunction); internalTimerService = @@ -204,15 +205,15 @@ public void processElement2(StreamRecord record) throws Exception { } @SuppressWarnings("unchecked") - private void processElement( - StreamRecord record, - MapState>> ourBuffer, - MapState>> otherBuffer, - long relativeLowerBound, - long relativeUpperBound, - boolean isLeft) throws Exception { - - final OUR ourValue = record.getValue(); + private void processElement( + final StreamRecord record, + final MapState>> ourBuffer, + final MapState>> otherBuffer, + final long relativeLowerBound, + final long relativeUpperBound, + final boolean isLeft) throws Exception { + + final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp(); if (ourTimestamp == Long.MIN_VALUE) { @@ -257,14 +258,18 @@ private boolean isLate(long timestamp) { } private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { - long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); + final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); + collector.setAbsoluteTimestamp(resultTimestamp); - context.leftTimestamp = leftTimestamp; - context.rightTimestamp = rightTimestamp; + context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp); + userFunction.processElement(left, right, context, collector); } - private void addToBuffer(MapState>> buffer, T value, long timestamp) throws Exception { + private static void addToBuffer( + final MapState>> buffer, + final T value, + final long timestamp) throws Exception { List> elemsInBucket = buffer.get(timestamp); if (elemsInBucket == null) { elemsInBucket = new ArrayList<>(); @@ -313,6 +318,8 @@ public void onProcessingTime(InternalTimer timer) throws Exception { */ private final class ContextImpl extends ProcessJoinFunction.Context { + private long resultTimestamp = Long.MIN_VALUE; + private long leftTimestamp = Long.MIN_VALUE; private long rightTimestamp = Long.MIN_VALUE; @@ -321,6 +328,12 @@ private ContextImpl(ProcessJoinFunction func) { func.super(); } + private void updateTimestamps(long left, long right, long result) { + this.leftTimestamp = left; + this.rightTimestamp = right; + this.resultTimestamp = result; + } + @Override public long getLeftTimestamp() { return leftTimestamp; @@ -333,7 +346,7 @@ public long getRightTimestamp() { @Override public long getTimestamp() { - return leftTimestamp; + return resultTimestamp; } @Override diff --git
[GitHub] asfgit closed pull request #6449: [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp.
asfgit closed pull request #6449: [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp. URL: https://github.com/apache/flink/pull/6449 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java index 0c449e64f41..43085cb42c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java @@ -152,6 +152,7 @@ public IntervalJoinOperator( @Override public void open() throws Exception { super.open(); + collector = new TimestampedCollector<>(output); context = new ContextImpl(userFunction); internalTimerService = @@ -204,15 +205,15 @@ public void processElement2(StreamRecord record) throws Exception { } @SuppressWarnings("unchecked") - private void processElement( - StreamRecord record, - MapState>> ourBuffer, - MapState>> otherBuffer, - long relativeLowerBound, - long relativeUpperBound, - boolean isLeft) throws Exception { - - final OUR ourValue = record.getValue(); + private void processElement( + final StreamRecord record, + final MapState>> ourBuffer, + final MapState>> otherBuffer, + final long relativeLowerBound, + final long relativeUpperBound, + final boolean isLeft) throws Exception { + + final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp(); if (ourTimestamp == Long.MIN_VALUE) { @@ -257,14 +258,18 @@ private boolean isLate(long timestamp) { } private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { - long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); + final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); + collector.setAbsoluteTimestamp(resultTimestamp); - context.leftTimestamp = leftTimestamp; - context.rightTimestamp = rightTimestamp; + context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp); + userFunction.processElement(left, right, context, collector); } - private void addToBuffer(MapState>> buffer, T value, long timestamp) throws Exception { + private static void addToBuffer( + final MapState>> buffer, + final T value, + final long timestamp) throws Exception { List> elemsInBucket = buffer.get(timestamp); if (elemsInBucket == null) { elemsInBucket = new ArrayList<>(); @@ -313,6 +318,8 @@ public void onProcessingTime(InternalTimer timer) throws Exception { */ private final class ContextImpl extends ProcessJoinFunction.Context { + private long resultTimestamp = Long.MIN_VALUE; + private long leftTimestamp = Long.MIN_VALUE; private long rightTimestamp = Long.MIN_VALUE; @@ -321,6 +328,12 @@ private ContextImpl(ProcessJoinFunction func) { func.super(); } + private void updateTimestamps(long left, long right, long result) { + this.leftTimestamp = left; + this.rightTimestamp = right; + this.resultTimestamp = result; + } + @Override public long getLeftTimestamp() { return leftTimestamp; @@ -333,7 +346,7 @@ public long getRightTimestamp() { @Override public long getTimestamp() { - return leftTimestamp; + return resultTimestamp; } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java index ee3f4d8fc12..53f514b98da 100644 ---
[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins
[ https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561216#comment-16561216 ] Elias Levy commented on FLINK-6243: --- Rereading my initial description of the issue, I see that I make no mention to our specific upsert requirements, so I think you are right that FLINK-8478 does satisfy this issue as described and that it may be best if I open a new issue for the upsert and (a)/(a,b) join semantics I'd like. > Continuous Joins: True Sliding Window Joins > > > Key: FLINK-6243 > URL: https://issues.apache.org/jira/browse/FLINK-6243 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.1.4 >Reporter: Elias Levy >Priority: Major > > Flink defines sliding window joins as the join of elements of two streams > that share a window of time, where the windows are defined by advancing them > forward some amount of time that is less than the window time span. More > generally, such windows are just overlapping hopping windows. > Other systems, such as Kafka Streams, support a different notion of sliding > window joins. In these systems, two elements of a stream are joined if the > absolute time difference between the them is less or equal the time window > length. > This alternate notion of sliding window joins has some advantages in some > applications over the current implementation. > Elements to be joined may both fall within multiple overlapping sliding > windows, leading them to be joined multiple times, when we only wish them to > be joined once. > The implementation need not instantiate window objects to keep track of > stream elements, which becomes problematic in the current implementation if > the window size is very large and the slide is very small. > It allows for asymmetric time joins. E.g. join if elements from stream A are > no more than X time behind and Y time head of an element from stream B. > It is currently possible to implement a join with these semantics using > {{CoProcessFunction}}, but the capability should be a first class feature, > such as it is in Kafka Streams. > To perform the join, elements of each stream must be buffered for at least > the window time length. To allow for large window sizes and high volume of > elements, the state, possibly optionally, should be buffered such as it can > spill to disk (e.g. by using RocksDB). > The same stream may be joined multiple times in a complex topology. As an > optimization, it may be wise to reuse any element buffer among colocated join > operators. Otherwise, there may write amplification and increased state that > must be snapshotted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins
[ https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561215#comment-16561215 ] Elias Levy commented on FLINK-6243: --- Stephan, thanks for bringing FLINK-8478 to my attention. Alas, while getting closer to meeting our join requirements, it does not quite fulfill them. Our joins require the semantics of joining two upsert tables, i.e. only joining the latest value by key. The DataStream Interval Join being implemented does not support those semantics, as it will buffer and join all elements for a key that fall within the interval. Seems the upsert semantics could be implemented by changing the state from a {{MapState}} buffering multiple events per key to a {{ValueState}}, keeping the latest event per event time. We also need these joins to be outer joins, but I see that there is already a subtask to implement those (FLINK-8483). Finally, we also need to implement a join between two streams where one stream is keyed by a subset of other stream's composite key (e.g. the left stream is keyed by {{col1}} and the right stream by ({{col1}}, {{col2)}}), also with upsert semantics. This could be implemented by keying both streams by {{col1}}, keeping a ValueState for the left stream buffering the latest event, and using a MapState on the right stream keyed by {{col2}} buffering the latest event per ({{col1}}, {{col2}}) tuple. Maybe something like: {code:scala} leftStream .keyKey(_.col1) .upsertJoin(rightStream.keyKey(_.col1).subKey(_.col2)) .between(...) .process(...) {code} Looking at the implementation, I also worry that the clean up timers are not being coalesced, which may result in high overhead processing the clean up timers for high throughput streams. > Continuous Joins: True Sliding Window Joins > > > Key: FLINK-6243 > URL: https://issues.apache.org/jira/browse/FLINK-6243 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.1.4 >Reporter: Elias Levy >Priority: Major > > Flink defines sliding window joins as the join of elements of two streams > that share a window of time, where the windows are defined by advancing them > forward some amount of time that is less than the window time span. More > generally, such windows are just overlapping hopping windows. > Other systems, such as Kafka Streams, support a different notion of sliding > window joins. In these systems, two elements of a stream are joined if the > absolute time difference between the them is less or equal the time window > length. > This alternate notion of sliding window joins has some advantages in some > applications over the current implementation. > Elements to be joined may both fall within multiple overlapping sliding > windows, leading them to be joined multiple times, when we only wish them to > be joined once. > The implementation need not instantiate window objects to keep track of > stream elements, which becomes problematic in the current implementation if > the window size is very large and the slide is very small. > It allows for asymmetric time joins. E.g. join if elements from stream A are > no more than X time behind and Y time head of an element from stream B. > It is currently possible to implement a join with these semantics using > {{CoProcessFunction}}, but the capability should be a first class feature, > such as it is in Kafka Streams. > To perform the join, elements of each stream must be buffered for at least > the window time length. To allow for large window sizes and high volume of > elements, the state, possibly optionally, should be buffered such as it can > spill to disk (e.g. by using RocksDB). > The same stream may be joined multiple times in a complex topology. As an > optimization, it may be wise to reuse any element buffer among colocated join > operators. Otherwise, there may write amplification and increased state that > must be snapshotted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-8523: Comment: was deleted (was: Hey [~pnowojski], [~NicoK] Glad to see we come back to this issue again. I think I understand your concerns completely, and actually there are two separate issues to be confirmed: 1. Whether to spill intermediate buffers before barrier alignment? If spilling the following buffers for blocked channel which already received barrier as before, we can free more floating buffer resources which may be used for other unblocked channels. From this point, it seems get benefit for barrier alignment. But the only concern is that it brings additional IO cost during spilling/replaying intermediate buffers. If the alignment is very fast which means only few intermediate buffers need to be spilled, and they may still exist in OS cache, so the cost can be ignored. But if the spilled data is very huge in IO sensitive environment, it will greatly hurt the performance in TPS. If not spilling as current codes, the only concern is that we can not make fully use of floating buffers before alignment, and it may delay the barrier alignment in some scenarios. So based on above analysis, no matter which way we take, it both has good points and bad points, and the behaviors may be different in various scenarios. In non-credit-based mode, we have to spill the data to avoid the deadlock, but now we have the chance to avoid the spill to try to make it better. And it seems better to not involve in any disk IO operation for stream job in runtime stack. From this point, I prefer to the way of not spilling. Maybe we need more tests, feedback or thinking for the final decision. 2. Avoid requesting floating buffers for blocked channels I think we can reach an agreement in this issue. No matter what is the conclusion of first issue. it is reasonable and can get determined benefit for doing this. And this JIRA is focusing on this issue. BTW, we ever made another improvement for speeding barrier alignment, that is reading unblocked channels in first priority instead of current random mode(FIFO based on network receiving). And it indeeds improve a log in barrier alignment aspect, because the task will not select unused intermediate buffers any more before alignment. But this selection may also change the original back pressure behavior and effect the performance in some scenarios. So it may be also a trade off.) > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561200#comment-16561200 ] zhijiang commented on FLINK-8523: - Hey [~pnowojski], [~NicoK] Glad to see we come back to this issue again. I think I understand your concerns completely, and actually there are two separate issues to be confirmed: 1. Whether to spill intermediate buffers before barrier alignment? If spilling the following buffers for blocked channel which already received barrier as before, we can free more floating buffer resources which may be used for other unblocked channels. From this point, it seems get benefit for barrier alignment. But the only concern is that it brings additional IO cost during spilling/replaying intermediate buffers. If the alignment is very fast which means only few intermediate buffers need to be spilled, and they may still exist in OS cache, so the cost can be ignored. But if the spilled data is very huge in IO sensitive environment, it will greatly hurt the performance in TPS. If not spilling as current codes, the only concern is that we can not make fully use of floating buffers before alignment, and it may delay the barrier alignment in some scenarios. So based on above analysis, no matter which way we take, it both has good points and bad points, and the behaviors may be different in various scenarios. In non-credit-based mode, we have to spill the data to avoid the deadlock, but now we have the chance to avoid the spill to try to make it better. And it seems better to not involve in any disk IO operation for stream job in runtime stack. From this point, I prefer to the way of not spilling. Maybe we need more tests, feedback or thinking for the final decision. 2. Avoid requesting floating buffers for blocked channels I think we can reach an agreement in this issue. No matter what is the conclusion of first issue. it is reasonable and can get determined benefit for doing this. And this JIRA is focusing on this issue. BTW, we ever made another improvement for speeding barrier alignment, that is reading unblocked channels in first priority instead of current random mode(FIFO based on network receiving). And it indeeds improve a log in barrier alignment aspect, because the task will not select unused intermediate buffers any more before alignment. But this selection may also change the original back pressure behavior and effect the performance in some scenarios. So it may be also a trade off. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8523) Stop assigning floating buffers for blocked input channels in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-8523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561199#comment-16561199 ] zhijiang commented on FLINK-8523: - Hey [~pnowojski], [~NicoK] Glad to see we come back to this issue again. I think I understand your concerns completely, and actually there are two separate issues to be confirmed: 1. Whether to spill intermediate buffers before barrier alignment? If spilling the following buffers for blocked channel which already received barrier as before, we can free more floating buffer resources which may be used for other unblocked channels. From this point, it seems get benefit for barrier alignment. But the only concern is that it brings additional IO cost during spilling/replaying intermediate buffers. If the alignment is very fast which means only few intermediate buffers need to be spilled, and they may still exist in OS cache, so the cost can be ignored. But if the spilled data is very huge in IO sensitive environment, it will greatly hurt the performance in TPS. If not spilling as current codes, the only concern is that we can not make fully use of floating buffers before alignment, and it may delay the barrier alignment in some scenarios. So based on above analysis, no matter which way we take, it both has good points and bad points, and the behaviors may be different in various scenarios. In non-credit-based mode, we have to spill the data to avoid the deadlock, but now we have the chance to avoid the spill to try to make it better. And it seems better to not involve in any disk IO operation for stream job in runtime stack. From this point, I prefer to the way of not spilling. Maybe we need more tests, feedback or thinking for the final decision. 2. Avoid requesting floating buffers for blocked channels I think we can reach an agreement in this issue. No matter what is the conclusion of first issue. it is reasonable and can get determined benefit for doing this. And this JIRA is focusing on this issue. BTW, we ever made another improvement for speeding barrier alignment, that is reading unblocked channels in first priority instead of current random mode(FIFO based on network receiving). And it indeeds improve a log in barrier alignment aspect, because the task will not select unused intermediate buffers any more before alignment. But this selection may also change the original back pressure behavior and effect the performance in some scenarios. So it may be also a trade off. > Stop assigning floating buffers for blocked input channels in exactly-once > mode > --- > > Key: FLINK-8523 > URL: https://issues.apache.org/jira/browse/FLINK-8523 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > > In exactly-once mode, the input channel is set blocked state when reading > barrier from it. And the blocked state will be released after barrier > alignment or cancelled. > > In credit-based network flow control, we should avoid assigning floating > buffers for blocked input channels because the buffers after barrier will not > be processed by operator until alignment. > To do so, we can fully make use of floating buffers and speed up barrier > alignment in some extent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9981) Tune performance of RocksDB implementation
[ https://issues.apache.org/jira/browse/FLINK-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561195#comment-16561195 ] ASF GitHub Bot commented on FLINK-9981: --- sihuazhou commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation URL: https://github.com/apache/flink/pull/6438#discussion_r205981566 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ## @@ -116,7 +127,14 @@ public RocksDBOrderedSetStore( @Override public void add(@Nonnull T element) { + byte[] elementBytes = serializeElement(element); + + if (LEXICOGRAPIC_BYTE_COMPARATOR.compare(elementBytes, lowerBoundSeekKey) < 0) { + // a smaller element means a new lower bound. + lowerBoundSeekKey = elementBytes; Review comment: I wonder how could this happen? The `groupPrefixBytes` should be the smallest element since all valid element should start with the `groupPrefixBytes` or did I misunderstand something... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tune performance of RocksDB implementation > -- > > Key: FLINK-9981 > URL: https://issues.apache.org/jira/browse/FLINK-9981 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > General performance tuning/polishing for the RocksDB implementation. We can > figure out how caching/seeking can be improved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sihuazhou commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation
sihuazhou commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation URL: https://github.com/apache/flink/pull/6438#discussion_r205981566 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ## @@ -116,7 +127,14 @@ public RocksDBOrderedSetStore( @Override public void add(@Nonnull T element) { + byte[] elementBytes = serializeElement(element); + + if (LEXICOGRAPIC_BYTE_COMPARATOR.compare(elementBytes, lowerBoundSeekKey) < 0) { + // a smaller element means a new lower bound. + lowerBoundSeekKey = elementBytes; Review comment: I wonder how could this happen? The `groupPrefixBytes` should be the smallest element since all valid element should start with the `groupPrefixBytes` or did I misunderstand something... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process
Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-408690962 A little busy these day and delayed this PR, Now I have separate `SharedBuffer` to - `SharedBuffer` to deal with the NFA query - `SharedBufferAccessor` to deal with the underlay `State` directly Please take a look again @dawidwys ,thx. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561174#comment-16561174 ] ASF GitHub Bot commented on FLINK-9642: --- Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-408690962 A little busy these day and delayed this PR, Now I have separate `SharedBuffer` to - `SharedBuffer` to deal with the NFA query - `SharedBufferAccessor` to deal with the underlay `State` directly Please take a look again @dawidwys ,thx. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9943) Support TaskManagerMetricQueryServicePaths msg in JobManager Actor
[ https://issues.apache.org/jira/browse/FLINK-9943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561128#comment-16561128 ] ASF GitHub Bot commented on FLINK-9943: --- chuanlei commented on issue #6429: [FLINK-9943] Support TaskManagerMetricQueryServicePaths msg in JobManager Actor URL: https://github.com/apache/flink/pull/6429#issuecomment-408682620 cc @zentol could you please have a look for this pr? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support TaskManagerMetricQueryServicePaths msg in JobManager Actor > -- > > Key: FLINK-9943 > URL: https://issues.apache.org/jira/browse/FLINK-9943 > Project: Flink > Issue Type: New Feature > Components: Core >Affects Versions: 1.5.0, 1.5.1 >Reporter: Chuanlei Ni >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The reasons are as follows > # AkkaJobManagerGateway wraps jm actor ref to support such functionality by > request RegisteredTaskManagers firstly and request task manager actor to get > metric query service path one by one. the procedure above is resource-wasted. > It will be more efficient if we support this functionality in the jm actor > # we can expose flink metric system directly to external system (such as > flink client and the like) to support more features in future. For now, > metric system has been exposed partially because Instance can not (and should > not) be transfered remotely. This feature will make metrics exposure > consistent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] chuanlei commented on issue #6429: [FLINK-9943] Support TaskManagerMetricQueryServicePaths msg in JobManager Actor
chuanlei commented on issue #6429: [FLINK-9943] Support TaskManagerMetricQueryServicePaths msg in JobManager Actor URL: https://github.com/apache/flink/pull/6429#issuecomment-408682620 cc @zentol could you please have a look for this pr? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9991) Add regex_replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561118#comment-16561118 ] ASF GitHub Bot commented on FLINK-9991: --- yanghua opened a new pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450 ## What is the purpose of the change *This pull request add regex_replace supported in TableAPI and SQL* ## Brief change log - *Add regex_replace supported in TableAPI and SQL* ## Verifying this change This change is already covered by existing tests, such as *ScalarFunctionsTest#testRegexReplace*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add regex_replace supported in TableAPI and SQL > --- > > Key: FLINK-9991 > URL: https://issues.apache.org/jira/browse/FLINK-9991 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regexp_replace is a very userful function to process String. > For example : > {code:java} > regexp_replace("foobar", "oo|ar", "") //returns 'fb.' > {code} > It is supported as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9991) Add regex_replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9991: -- Labels: pull-request-available (was: ) > Add regex_replace supported in TableAPI and SQL > --- > > Key: FLINK-9991 > URL: https://issues.apache.org/jira/browse/FLINK-9991 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regexp_replace is a very userful function to process String. > For example : > {code:java} > regexp_replace("foobar", "oo|ar", "") //returns 'fb.' > {code} > It is supported as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL
yanghua opened a new pull request #6450: [FLINK-9991] Add regex_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450 ## What is the purpose of the change *This pull request add regex_replace supported in TableAPI and SQL* ## Brief change log - *Add regex_replace supported in TableAPI and SQL* ## Verifying this change This change is already covered by existing tests, such as *ScalarFunctionsTest#testRegexReplace*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561101#comment-16561101 ] Renjie Liu commented on FLINK-9936: --- [~gjy] I'm working on this and I've already made an internal version testing in our own deployment which works well. I'm going to publish our patch and add some tests to it. > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561093#comment-16561093 ] Gary Yao edited comment on FLINK-9936 at 7/29/18 12:13 PM: --- [~liurenjie1024] What's the state of this ticket? Do you have solution in mind? I think we need to add a callback to the {{Runnable}} scheduled in {{ResourceManager#grantLeadership}}. What do you think? was (Author: gjy): [~liurenjie1024] What's the state of this ticket? Do you have solution in mind? I think we need to add a callback to the runnable scheduled in {{ResourceManager#grantLeadership}}. What do you think? > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561093#comment-16561093 ] Gary Yao commented on FLINK-9936: - [~liurenjie1024] What's the state of this ticket? Do you have solution in mind? I think we need to add a callback to the runnable scheduled in {{ResourceManager#grantLeadership}}. What do you think? > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9994) IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.
[ https://issues.apache.org/jira/browse/FLINK-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9994: -- Labels: pull-request-available (was: ) > IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp. > > > Key: FLINK-9994 > URL: https://issues.apache.org/jira/browse/FLINK-9994 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9994) IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.
[ https://issues.apache.org/jira/browse/FLINK-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561091#comment-16561091 ] ASF GitHub Bot commented on FLINK-9994: --- kl0u opened a new pull request #6449: [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp. URL: https://github.com/apache/flink/pull/6449 ## What is the purpose of the change Although the `Context.getTimestamp()` in the `IntervalJoinOperator` should return the max timestamp between the elements in the joined pair, it currently returns the one of the "left" element. This is a remain from past versions of the code, as the timestamp of the collector is correctly updated to the max timestamp between the ones in the matched pair. ## Brief change log The main change is in the `collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp)` of the `InternalJoinOperator`. ## Verifying this change The tests that were testing this behavior were wrong and they are now fixed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp. > > > Key: FLINK-9994 > URL: https://issues.apache.org/jira/browse/FLINK-9994 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u opened a new pull request #6449: [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp.
kl0u opened a new pull request #6449: [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp. URL: https://github.com/apache/flink/pull/6449 ## What is the purpose of the change Although the `Context.getTimestamp()` in the `IntervalJoinOperator` should return the max timestamp between the elements in the joined pair, it currently returns the one of the "left" element. This is a remain from past versions of the code, as the timestamp of the collector is correctly updated to the max timestamp between the ones in the matched pair. ## Brief change log The main change is in the `collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp)` of the `InternalJoinOperator`. ## Verifying this change The tests that were testing this behavior were wrong and they are now fixed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9994) IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.
[ https://issues.apache.org/jira/browse/FLINK-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-9994: -- Fix Version/s: 1.6.0 > IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp. > > > Key: FLINK-9994 > URL: https://issues.apache.org/jira/browse/FLINK-9994 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9994) IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.
Kostas Kloudas created FLINK-9994: - Summary: IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp. Key: FLINK-9994 URL: https://issues.apache.org/jira/browse/FLINK-9994 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.6.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9990) Add regex_extract supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561049#comment-16561049 ] ASF GitHub Bot commented on FLINK-9990: --- yanghua opened a new pull request #6448: [FLINK-9990] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448 ## What is the purpose of the change *This pull request add regex_extract supported in TableAPI and SQL* ## Brief change log - *Add regex_extract supported in TableAPI and SQL* ## Verifying this change This change is already covered by existing tests, such as *ScalarFunctionsTest#testRegexExtract*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add regex_extract supported in TableAPI and SQL > --- > > Key: FLINK-9990 > URL: https://issues.apache.org/jira/browse/FLINK-9990 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regex_extract is a very useful function, it returns a string based on a regex > pattern and a index. > For example : > {code:java} > regexp_extract('foothebar', 'foo(.*?)(bar)', 2) // returns 'bar.' > {code} > It is provided as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9990) Add regex_extract supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9990: -- Labels: pull-request-available (was: ) > Add regex_extract supported in TableAPI and SQL > --- > > Key: FLINK-9990 > URL: https://issues.apache.org/jira/browse/FLINK-9990 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regex_extract is a very useful function, it returns a string based on a regex > pattern and a index. > For example : > {code:java} > regexp_extract('foothebar', 'foo(.*?)(bar)', 2) // returns 'bar.' > {code} > It is provided as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #6448: [FLINK-9990] Add regex_extract supported in TableAPI and SQL
yanghua opened a new pull request #6448: [FLINK-9990] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448 ## What is the purpose of the change *This pull request add regex_extract supported in TableAPI and SQL* ## Brief change log - *Add regex_extract supported in TableAPI and SQL* ## Verifying this change This change is already covered by existing tests, such as *ScalarFunctionsTest#testRegexExtract*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8163) NonHAQueryableStateFsBackendITCase test getting stuck on Travis
[ https://issues.apache.org/jira/browse/FLINK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561045#comment-16561045 ] Congxian Qiu commented on FLINK-8163: - When I got stuck, I got the jstack message as below {code:java} "Flink Netty Client (0) Thread 0" #11277 daemon prio=5 os_prio=31 tid=0x7fe7237ed800 nid=0x323a3 runnable [0x70002c2d] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x000742003840> (a org.apache.flink.shaded.netty4.io.netty.channel.nio.SelectedSelectionKeySet) - locked <0x000742003830> (a java.util.Collections$UnmodifiableSet) - locked <0x0007420037e0> (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.flink.shaded.netty4.io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:62) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:753) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:409) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) at java.lang.Thread.run(Thread.java:748) ... "Flink Netty Client (0) Thread 0" #11275 daemon prio=5 os_prio=31 tid=0x7fe70900b000 nid=0x3fe37 runnable [0x70002b9b5000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x000741e60f58> (a org.apache.flink.shaded.netty4.io.netty.channel.nio.SelectedSelectionKeySet) - locked <0x000741e60ea0> (a java.util.Collections$UnmodifiableSet) - locked <0x000741e60dc0> (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.flink.shaded.netty4.io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:62) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:753) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:409) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) at java.lang.Thread.run(Thread.java:748) {code} > NonHAQueryableStateFsBackendITCase test getting stuck on Travis > --- > > Key: FLINK-8163 > URL: https://issues.apache.org/jira/browse/FLINK-8163 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available, test-stability > > The {{NonHAQueryableStateFsBackendITCase}} tests seems to get stuck on Travis > producing no output for 300s. > https://travis-ci.org/tillrohrmann/flink/jobs/307988209 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhangminglei closed pull request #6380: [FLINK-9614] [table] Improve the error message for Compiler#compile
zhangminglei closed pull request #6380: [FLINK-9614] [table] Improve the error message for Compiler#compile URL: https://github.com/apache/flink/pull/6380 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala index 4fcfab0e507..a3ac48f2ca7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala @@ -34,7 +34,7 @@ trait Compiler[T] { } catch { case t: Throwable => throw new InvalidProgramException("Table program cannot be compiled. " + - "This is a bug. Please file an issue.", t) + "This might be a bug. Please file an issue.", t) } compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9614) Improve the error message for Compiler#compile
[ https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561042#comment-16561042 ] ASF GitHub Bot commented on FLINK-9614: --- zhangminglei closed pull request #6380: [FLINK-9614] [table] Improve the error message for Compiler#compile URL: https://github.com/apache/flink/pull/6380 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala index 4fcfab0e507..a3ac48f2ca7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala @@ -34,7 +34,7 @@ trait Compiler[T] { } catch { case t: Throwable => throw new InvalidProgramException("Table program cannot be compiled. " + - "This is a bug. Please file an issue.", t) + "This might be a bug. Please file an issue.", t) } compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve the error message for Compiler#compile > -- > > Key: FLINK-9614 > URL: https://issues.apache.org/jira/browse/FLINK-9614 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > When the below sql has too long. Like > case when case when . > when host in > ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247') > then 'condition' > Then cause the {{StackOverflowError}}. And the current code is below, we can > solve this by setting -Xss 20m, instead of {{This is a bug..}} > {code:java} > trait Compiler[T] { > @throws(classOf[CompileException]) > def compile(cl: ClassLoader, name: String, code: String): Class[T] = { > require(cl != null, "Classloader must not be null.") > val compiler = new SimpleCompiler() > compiler.setParentClassLoader(cl) > try { > compiler.cook(code) > } catch { > case t: Throwable => > throw new InvalidProgramException("Table program cannot be compiled. > " + > "This is a bug. Please file an issue.", t) > } > compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9614) Improve the error message for Compiler#compile
[ https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei resolved FLINK-9614. - Resolution: Won't Fix > Improve the error message for Compiler#compile > -- > > Key: FLINK-9614 > URL: https://issues.apache.org/jira/browse/FLINK-9614 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > When the below sql has too long. Like > case when case when . > when host in > ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247') > then 'condition' > Then cause the {{StackOverflowError}}. And the current code is below, we can > solve this by setting -Xss 20m, instead of {{This is a bug..}} > {code:java} > trait Compiler[T] { > @throws(classOf[CompileException]) > def compile(cl: ClassLoader, name: String, code: String): Class[T] = { > require(cl != null, "Classloader must not be null.") > val compiler = new SimpleCompiler() > compiler.setParentClassLoader(cl) > try { > compiler.cook(code) > } catch { > case t: Throwable => > throw new InvalidProgramException("Table program cannot be compiled. > " + > "This is a bug. Please file an issue.", t) > } > compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9925) ClientTest.testSimpleRequests fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561036#comment-16561036 ] Congxian Qiu commented on FLINK-9925: - I run the test locally ~ 100 times, but can produce this error. If I can reproduce this error, maybe I could help to fix it. > ClientTest.testSimpleRequests fails on Travis > - > > Key: FLINK-9925 > URL: https://issues.apache.org/jira/browse/FLINK-9925 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > {{ClientTest.testSimpleRequests}} fails on Travis with an {{AssertionError}}: > https://api.travis-ci.org/v3/job/405690023/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)