[jira] [Created] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window
Rong Rong created FLINK-7357: Summary: HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window Key: FLINK-7357 URL: https://issues.apache.org/jira/browse/FLINK-7357 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.1 Reporter: Rong Rong The following SQL does not compile: {code:title=invalid_having_hop_start_sql} SELECT c AS k, COUNT(a) AS v, HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowStart, HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd FROM T1 GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), c HAVING SUM(b) > 1 {code} While individually keeping HAVING clause or HOP_START field compiles and runs without issue. more details: https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7177) [table] Using Table API to perform aggregation on another Table API / SQL result table causes runVolcanoPlanner failed on physicalPlan generation
Rong Rong created FLINK-7177: Summary: [table] Using Table API to perform aggregation on another Table API / SQL result table causes runVolcanoPlanner failed on physicalPlan generation Key: FLINK-7177 URL: https://issues.apache.org/jira/browse/FLINK-7177 Project: Flink Issue Type: Bug Reporter: Rong Rong For example: ``` val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val inputTable = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a, 'b) val resultTable = inputTable.select('a, 'b).where('a.get("_1") > 0) val failingTable = resultTable.select('a.get("_1").avg, 'a.get("_2").sum, 'b.count) ``` Details can be found in: https://github.com/apache/flink/compare/master...walterddr:bug_report_sql_query_result_consume_by_table_api -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7137) Flink table API defaults top level fields as nullable and all nested fields within CompositeType as non-nullable
Rong Rong created FLINK-7137: Summary: Flink table API defaults top level fields as nullable and all nested fields within CompositeType as non-nullable Key: FLINK-7137 URL: https://issues.apache.org/jira/browse/FLINK-7137 Project: Flink Issue Type: Bug Affects Versions: 1.3.1 Reporter: Rong Rong Right now FlinkTypeFactory does conversion between Flink TypeInformation to Calcite RelDataType by assuming the following: All top level fields will be set to nullable and all nested fields within CompositeRelDataType and GenericRelDataType will be set to Calcite default (which is non-nullable). This triggers Calcite SQL optimization engine drop all `IS NOT NULL` clause on nested fields, and would not be able to optimize when top level fields were actually non-nullable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7373) Using URLClassLoader to load UDF triggers HepPlanner unexpected ClassNotFoundException
Rong Rong created FLINK-7373: Summary: Using URLClassLoader to load UDF triggers HepPlanner unexpected ClassNotFoundException Key: FLINK-7373 URL: https://issues.apache.org/jira/browse/FLINK-7373 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.1 Reporter: Rong Rong Using URLClassLoader to load, say from Artifactory, and instantiate UDF instances will cause some Rule failed during runHepPlanner or runVolcanoPlanner. One example could add an ITCase in: {code:title=flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala} @Test def testUserDefinedFunctionDynamicClassloader() { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env, config) val jarFileURI = "file://org/apache/flink/table/udf/HelloWorld.jar" val udfClassLoader: ClassLoader = new URLClassLoader(List(new URI(jarFileURI).toURL).toArray) val clazz = udfClassLoader.loadClass("org.apache.flink.table.udf.HelloWorld") val helloWorldUDF: ScalarFunction = clazz.newInstance().asInstanceOf[ScalarFunction] tableEnv.registerFunction("helloWorld", helloWorldUDF) val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text) val result = table.select("text.helloWorld()") val results = result.toDataSet[Row].collect() val expected = "Hello World!" TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} where {code:title=HelloWorld.java} package org.apache.flink.table.udf; import org.apache.flink.table.functions.ScalarFunction; public class HelloWorld extends ScalarFunction { public String eval(String o) { if (o == null) { return "Hello World"; } else { return "Hellow World " + o.toString(); } } } {code} This triggers the following Exception: {panel:title=Exception} org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:31) at org.apache.flink.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:45) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:62) .. Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 22: Cannot determine simple type name "org" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) .. {panel} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7678) SQL UserDefineTableFunction does not take CompositeType input correctly
Rong Rong created FLINK-7678: Summary: SQL UserDefineTableFunction does not take CompositeType input correctly Key: FLINK-7678 URL: https://issues.apache.org/jira/browse/FLINK-7678 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.2 Reporter: Rong Rong UDF is using FlinkTypeFactory to infer operand type while UDTF does not go through the same code path. This result in: {code:console} org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 38 to line 1, column 44: No match found for function signature func() Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 38 to line 1, column 44: No match found for function signature func( ) {code} Please see github code for more info: https://github.com/walterddr/flink/blob/bug_report/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/UDTFCompositeTypeTestFailure.scala -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8151) [Table] removing map value equality check
Rong Rong created FLINK-8151: Summary: [Table] removing map value equality check Key: FLINK-8151 URL: https://issues.apache.org/jira/browse/FLINK-8151 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Rong Rong Following up with FLINK-8038. The equality check is not working as Map does not support element-wise equality. Suggest to remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8104) Fix Row value constructor
Rong Rong created FLINK-8104: Summary: Fix Row value constructor Key: FLINK-8104 URL: https://issues.apache.org/jira/browse/FLINK-8104 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Rong Rong Support Row value constructor which is currently broken. See {code:java} // flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala @Test def testValueConstructorFunctions(): Unit = { // TODO we need a special code path that flattens ROW types // testSqlApi("ROW('hello world', 12)", "hello world") // test base only returns field 0 // testSqlApi("('hello world', 12)", "hello world") // test base only returns field 0 // ... } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8038) Support MAP literal
Rong Rong created FLINK-8038: Summary: Support MAP literal Key: FLINK-8038 URL: https://issues.apache.org/jira/browse/FLINK-8038 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Rong Rong Assignee: Rong Rong Similar to https://issues.apache.org/jira/browse/FLINK-4554 We want to support Map literals which is supported by Calcite: https://calcite.apache.org/docs/reference.html#value-constructors {code:sql} SELECT MAP['key1', f0, 'key2', f1] AS stringKeyedMap, MAP['key', 'value'] AS literalMap, MAP[f0, f1] AS fieldMap FROM table {code} This should enable users to construct MapTypeInfo, one of the CompositeType. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
Rong Rong created FLINK-7922: Summary: leastRestrictive in FlinkTypeFactory does not resolve composite type correctly Key: FLINK-7922 URL: https://issues.apache.org/jira/browse/FLINK-7922 Project: Flink Issue Type: Bug Reporter: Rong Rong FlinkTypeFactory does not override the following function correctly: `leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... }` to deal with situations like ``` CASE WHEN THEN ELSE NULL END ``` will trigger runtime exception -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
Rong Rong created FLINK-7923: Summary: SQL parser exception when accessing subfields of a Composite element in an Object Array type column Key: FLINK-7923 URL: https://issues.apache.org/jira/browse/FLINK-7923 Project: Flink Issue Type: Bug Affects Versions: 1.4.0 Reporter: Rong Rong Access type such as: {code:SQL} SELECT a[1].f0 FROM MyTable {code} will cause problem. See following test sample for more details: https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-9398) Flink CLI list running job returns all jobs except in CREATE state
Rong Rong created FLINK-9398: Summary: Flink CLI list running job returns all jobs except in CREATE state Key: FLINK-9398 URL: https://issues.apache.org/jira/browse/FLINK-9398 Project: Flink Issue Type: Bug Components: Client Affects Versions: 1.5.0 Reporter: Rong Rong See: https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445 Seems like CLI command: *flink list -r* returns all jobs except jobs in *CREATE* state. which conflicts with the CLI description: *--Running/Restarting Jobs--*. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9484) Improve generic type inference for User-Defined Functions
Rong Rong created FLINK-9484: Summary: Improve generic type inference for User-Defined Functions Key: FLINK-9484 URL: https://issues.apache.org/jira/browse/FLINK-9484 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Rong Rong Assignee: Rong Rong User-defined function has been a great extension for Flink SQL API to support much complex logics. We experienced many inconvenience when dealing with UDF with generic types and are summarized in the following [doc|https://docs.google.com/document/d/1zKSY1z0lvtQdfOgwcLnCMSRHew3weeJ6QfQjSD0zWas/edit?usp=sharing]. We are planning to implement the generic type inference / functioncatalog look up in multiple phases. Detail tickets will be created. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9501) Allow Object.class type in user-define functions as parameter types but not result types
Rong Rong created FLINK-9501: Summary: Allow Object.class type in user-define functions as parameter types but not result types Key: FLINK-9501 URL: https://issues.apache.org/jira/browse/FLINK-9501 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Rong Rong Assignee: Rong Rong -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9502) Use generic parameter search for user-define functions when argument contains Object type
Rong Rong created FLINK-9502: Summary: Use generic parameter search for user-define functions when argument contains Object type Key: FLINK-9502 URL: https://issues.apache.org/jira/browse/FLINK-9502 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Rong Rong Assignee: Rong Rong -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
Rong Rong created FLINK-9294: Summary: Improve type inference for UDFs with composite parameter or result type Key: FLINK-9294 URL: https://issues.apache.org/jira/browse/FLINK-9294 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Rong Rong For now most of the UDF function signatures that includes composite types such as `MAP` `ARRAY`, etc would require user to override `getParameterType` or `getResultType` method explicitly. It should be able to resolve the composite type based on the function signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-7934) Upgrade Calcite dependency to 1.15
Rong Rong created FLINK-7934: Summary: Upgrade Calcite dependency to 1.15 Key: FLINK-7934 URL: https://issues.apache.org/jira/browse/FLINK-7934 Project: Flink Issue Type: Bug Reporter: Rong Rong Umbrella issue for all related issues for Apache Calcite 1.15 release. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8739) Optimize runtime support for distinct filter to reuse same distinct accumulator for filtering
Rong Rong created FLINK-8739: Summary: Optimize runtime support for distinct filter to reuse same distinct accumulator for filtering Key: FLINK-8739 URL: https://issues.apache.org/jira/browse/FLINK-8739 Project: Flink Issue Type: Sub-task Reporter: Rong Rong -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8688) Enable distinct aggregation for data stream on Table/SQL API
Rong Rong created FLINK-8688: Summary: Enable distinct aggregation for data stream on Table/SQL API Key: FLINK-8688 URL: https://issues.apache.org/jira/browse/FLINK-8688 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Rong Rong Assignee: Rong Rong Distinct aggregation is not currently supported on data stream with Table/SQL API. This is an umbrella task for enabling distinct aggregation in various use cases. Discussion doc can be found [here|[https://docs.google.com/document/d/1zj6OA-K2hi7ah8Fo-xTQB-mVmYfm6LsN2_NHgTCVmJI/edit?usp=sharing]] Distinct aggregation is a very important feature in SQL processing and there are many JIRAs currently open with various use cases. The goal is to create one solution to both unbounded and bounded distinct aggregation on data stream so that it can easily be extended to support these use cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct operator
Rong Rong created FLINK-8690: Summary: Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct operator Key: FLINK-8690 URL: https://issues.apache.org/jira/browse/FLINK-8690 Project: Flink Issue Type: Sub-task Reporter: Rong Rong -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8689) Add runtime support of distinct filter using MapView for GenerateAggregation
Rong Rong created FLINK-8689: Summary: Add runtime support of distinct filter using MapView for GenerateAggregation Key: FLINK-8689 URL: https://issues.apache.org/jira/browse/FLINK-8689 Project: Flink Issue Type: Sub-task Reporter: Rong Rong -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8691) Update table API to support distinct operator on data stream
Rong Rong created FLINK-8691: Summary: Update table API to support distinct operator on data stream Key: FLINK-8691 URL: https://issues.apache.org/jira/browse/FLINK-8691 Project: Flink Issue Type: Sub-task Reporter: Rong Rong -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10010) Deprecate unused BaseAlignedWindowAssigner related components
Rong Rong created FLINK-10010: - Summary: Deprecate unused BaseAlignedWindowAssigner related components Key: FLINK-10010 URL: https://issues.apache.org/jira/browse/FLINK-10010 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Rong Rong Assignee: Rong Rong {{BaseAlignedWindowAssigner}} should be marked as deprecated and {{SlidingAlignedProcessingTimeWindows}} should be removed from the Flink Repo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10172) Inconsistentcy in ExpressionParser and ExpressionDsl for order by asc/desc
Rong Rong created FLINK-10172: - Summary: Inconsistentcy in ExpressionParser and ExpressionDsl for order by asc/desc Key: FLINK-10172 URL: https://issues.apache.org/jira/browse/FLINK-10172 Project: Flink Issue Type: Bug Components: Table API SQL Affects Versions: 1.6.0 Reporter: Rong Rong Assignee: Rong Rong The following expression throws an exception in parsing {{"id.asc"}} term. {code:java} Table allOrders = orderTable .select("id,order_date,amount,customer_id") .orderBy("id.asc"); {code} while it is correctly parsed for Scala: {code:scala} val allOrders:Table = orderTable .select('id, 'order_date, 'amount, 'customer_id) .orderBy('id.asc) {code} Anticipated some inconsistency between ExpressionParser and ExpressionDsl -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9157) Create support for commonly used external catalog
Rong Rong created FLINK-9157: Summary: Create support for commonly used external catalog Key: FLINK-9157 URL: https://issues.apache.org/jira/browse/FLINK-9157 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Rong Rong It will be great to have SQL-Client to support some external catalogs out-of-the-box for SQL users to configure and utilize easily. Such as Apache HCatalog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9172) Support external catalog factory that comes default with SQL-Client
Rong Rong created FLINK-9172: Summary: Support external catalog factory that comes default with SQL-Client Key: FLINK-9172 URL: https://issues.apache.org/jira/browse/FLINK-9172 Project: Flink Issue Type: New Feature Reporter: Rong Rong It will be great to have SQL-Client to support some external catalogs out-of-the-box for SQL users to configure and utilize easily. I am currently think of having an external catalog factory that spins up both streaming and batch external catalog table sources and sinks. This could greatly unify and provide easy access for SQL users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9199) SubtaskExecutionAttemptAccumulatorsHeaders & SubtaskExecutionAttemptDetailsHeaders has malfunctioning URL
Rong Rong created FLINK-9199: Summary: SubtaskExecutionAttemptAccumulatorsHeaders & SubtaskExecutionAttemptDetailsHeaders has malfunctioning URL Key: FLINK-9199 URL: https://issues.apache.org/jira/browse/FLINK-9199 Project: Flink Issue Type: Bug Components: REST Reporter: Rong Rong -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9232) Add harness test for AggregationCodeGenerator
Rong Rong created FLINK-9232: Summary: Add harness test for AggregationCodeGenerator Key: FLINK-9232 URL: https://issues.apache.org/jira/browse/FLINK-9232 Project: Flink Issue Type: Sub-task Reporter: Rong Rong Instead of relying on ITCase to cover the codegen result. We should have direct test against that, for example using Harness test framework. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode
Rong Rong created FLINK-11088: - Summary: Improve Kerberos Authentication using Keytab in YARN proxy user mode Key: FLINK-11088 URL: https://issues.apache.org/jira/browse/FLINK-11088 Project: Flink Issue Type: Improvement Components: YARN Reporter: Rong Rong Currently flink-yarn assumes keytab is shipped as application master environment local resource on client side and will be distributed to all the TMs. This does not work for YARN proxy user mode since proxy user or super user does not have access to actual user's keytab but only delegation tokens. We propose to have the keytab file path discovery configurable depending on the launch mode of the YARN client. Reference: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11006) Update Calcite dependency to 1.18
Rong Rong created FLINK-11006: - Summary: Update Calcite dependency to 1.18 Key: FLINK-11006 URL: https://issues.apache.org/jira/browse/FLINK-11006 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Rong Rong Umbrella task to track all dependencies and tasks needs to be done for upgrading to Calcite 1.18 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11276) Slicing Window Optimization
Rong Rong created FLINK-11276: - Summary: Slicing Window Optimization Key: FLINK-11276 URL: https://issues.apache.org/jira/browse/FLINK-11276 Project: Flink Issue Type: New Feature Reporter: Rong Rong This is the umbrella ticket for the discussion in: https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing We would like to support syntax for more flexible window aggregations: 1. slicing and merging: {code:java} val resultStream: DataStream = inputStream .keyBy("key") .sliceWindow(Time.seconds(5L)) // new “slice window” concept: to combine // tumble results based on discrete // non-overlapping windows. .sum("value") .slideOver(Count.of(5)) // new “slide over” concept to merge // tumble results. {code} and 2. sliding aggregation with multi intervals, such as: {code:java} val slicedStream: SlicedStream = inputStream .keyBy("key") .sliceWindow(Time.seconds(5L)) .sum("value") val resultStream1: DataStream = slicedStream .slideOver(Time.seconds(10L))// slide over 5-sec slices // for the last 10 seconds val resultStream2: DataStream = slicedStream .slideOver(Count.of(3)) // slide over 5-sec slices // for the last 3 slices (15 seconds) val resultStream3: DataStream = slicedStream .slideOver(Count.of(180))// slide over 5-sec slices // for the last 3 slices (15 minutes) {code} and 3. replace current sliding window logic with more efficient slice and merge approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11271) Improve Kerberos Credential Distribution
Rong Rong created FLINK-11271: - Summary: Improve Kerberos Credential Distribution Key: FLINK-11271 URL: https://issues.apache.org/jira/browse/FLINK-11271 Project: Flink Issue Type: Improvement Components: Security, YARN Reporter: Rong Rong Assignee: Rong Rong This is the master JIRA for the improvement listed in: https://docs.google.com/document/d/1rBLCpyQKg6Ld2P0DEgv4VIOMTwv4sitd7h7P5r202IE/edit#heading=h.y34f96ctioqk -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11935) Remove DateTimeUtils pull-in and fix datetime casting problem
Rong Rong created FLINK-11935: - Summary: Remove DateTimeUtils pull-in and fix datetime casting problem Key: FLINK-11935 URL: https://issues.apache.org/jira/browse/FLINK-11935 Project: Flink Issue Type: Sub-task Reporter: Rong Rong Assignee: Rong Rong This {{DateTimeUtils}} was pulled in in FLINK-7235. Originally the time operation was not correctly done via the {{ymdToJulian}} function before the date {{1970-01-01}} thus we need the fix. similar to addressing this problem: {code:java} Optimized :1017-12-05 22:58:58.998 Expected :1017-11-29 22:58:58.998 Actual :1017-12-05 22:58:58.998 {code} However, after pulling in avatica 1.13, I found out that the optimized plans of the time operations are actually correct. it is in fact the casting part that creates problem: For example, the following: *{{(plus(-12000.months, cast('2017-11-29 22:58:58.998', TIMESTAMP))}}* result in a StringTestExpression of: *{{CAST(1017-11-29 22:58:58.998):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL}}* but the testing results are: {code:java} Optimized :1017-11-29 22:58:58.998 Expected :1017-11-29 22:58:58.998 Actual :1017-11-23 22:58:58.998 {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11909) Provide default failure/timeout handling strategy for AsyncIO functions
Rong Rong created FLINK-11909: - Summary: Provide default failure/timeout handling strategy for AsyncIO functions Key: FLINK-11909 URL: https://issues.apache.org/jira/browse/FLINK-11909 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Rong Rong Currently Flink AsyncIO by default fails the entire job when async function invoke fails [1]. It would be nice to have some default Async IO failure/timeout handling strategy, or opens up some APIs for AsyncFunction timeout method to interact with the AsyncWaitOperator. For example (quote [~suez1224]) : * FAIL_OPERATOR (default & current behavior) * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times) * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11936) Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter issue.
Rong Rong created FLINK-11936: - Summary: Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter issue. Key: FLINK-11936 URL: https://issues.apache.org/jira/browse/FLINK-11936 Project: Flink Issue Type: Sub-task Reporter: Rong Rong AuxiliaryConverter was pulled in FLINK-6409. Since CALCITE-1761 has been fixed, we should sync back with the calcite version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11569) Row type does not serialize in to readable format when invoke "toString" method
Rong Rong created FLINK-11569: - Summary: Row type does not serialize in to readable format when invoke "toString" method Key: FLINK-11569 URL: https://issues.apache.org/jira/browse/FLINK-11569 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Rong Rong Assignee: Rong Rong Seems like the "toString" method for Row type is only concatenating all fields using COMMA ",". However it does not wrap the entire Row in some type of encapsulation, for example "()". This results in nested Row being serialized as if they are all in one level. For example: {{Row.of("a", 1, Row.of("b", 2))}} is printed out as {{"a",1,"b",2}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11589) Introduce service provider pattern for user to dynamically load SecurityFactory classes
Rong Rong created FLINK-11589: - Summary: Introduce service provider pattern for user to dynamically load SecurityFactory classes Key: FLINK-11589 URL: https://issues.apache.org/jira/browse/FLINK-11589 Project: Flink Issue Type: Sub-task Components: Security Reporter: Rong Rong Assignee: Rong Rong Currently there are only 3 security modules in Flink - Hadoop, Zookeeper and JaaS, all of which are pre-loaded to the Flink security runtime with one hard-coded path for instantiating SecurityContext, which is used invoke use code with PrivilegedExceptionAction. We propose to introduce a [service provider pattern|https://docs.oracle.com/javase/tutorial/ext/basics/spi.html] to allow users to dynamically load {{SecurityModuleFactory}} or even introduce a new {{SecurityContextFactory}} so that all the security runtime context can be set by dynamically loading any 3rd party JAR. and discover them through property configurations. This is especially useful in a corporate environment where proprietary security technologies are involved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11453) Support SliceWindow with forwardable pane info
Rong Rong created FLINK-11453: - Summary: Support SliceWindow with forwardable pane info Key: FLINK-11453 URL: https://issues.apache.org/jira/browse/FLINK-11453 Project: Flink Issue Type: Sub-task Components: DataStream API Reporter: Rong Rong Support slicing operation that produces slicing: {code:java} val slicedStream: SlicedStream = inputStream .keyBy("key") .sliceWindow(Time.seconds(5L)) // new “slice window” concept: to combine // tumble results based on discrete // non-overlapping windows. .aggregate(aggFunc) {code} {{SlicedStream}} will produce results that exposes current {{WindowOperator}} internal state {{InternalAppendingState}}, which can be later applied with {{WindowFunction}} separately in another operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11454) Support MergedStream operation
Rong Rong created FLINK-11454: - Summary: Support MergedStream operation Key: FLINK-11454 URL: https://issues.apache.org/jira/browse/FLINK-11454 Project: Flink Issue Type: Sub-task Components: DataStream API Reporter: Rong Rong Following SlicedStream, the mergedStream operator merges results from sliced stream and produces windowing results. {code:java} val slicedStream: SlicedStream = inputStream .keyBy("key") .sliceWindow(Time.seconds(5L)) // new “slice window” concept: to combine // tumble results based on discrete // non-overlapping windows. .aggregate(aggFunc) val mergedStream1: MergedStream = slicedStream .slideOver(Time.second(10L)) // combine slice results with same // windowing function, equivalent to // WindowOperator with an aggregate state // and derived aggregate function. val mergedStream2: MergedStream = slicedStream .slideOver(Count.of(5)) .apply(windowFunction) // apply a different window function over // the sliced results.{code} MergedStream are produced by MergeOperator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11455) Support evictor operations on slicing and merging operators
Rong Rong created FLINK-11455: - Summary: Support evictor operations on slicing and merging operators Key: FLINK-11455 URL: https://issues.apache.org/jira/browse/FLINK-11455 Project: Flink Issue Type: Sub-task Reporter: Rong Rong The original implementation POC of SliceStream and MergeStream does not considere evicting window operations. this support can be further expanded in order to cover multiple timeout duration session windows. See [https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit#heading=h.ihxm3alf3tk0.] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11456) Improve window operator with sliding window assigners
Rong Rong created FLINK-11456: - Summary: Improve window operator with sliding window assigners Key: FLINK-11456 URL: https://issues.apache.org/jira/browse/FLINK-11456 Project: Flink Issue Type: Sub-task Components: DataStream API Reporter: Rong Rong With Slicing and merging operators that exposes the internals of window operators. current sliding window can be improved by eliminating duplicate aggregations or duplicate element insert into multiple panes (e.g. namespaces). The following sliding window operation {code:java} val resultStream: DataStream = inputStream .keyBy("key") .window(SlidingEventTimeWindow.of(Time.seconds(5L), Time.seconds(15L))) .sum("value") {code} can produce job graph equivalent to {code:java} val resultStream: DataStream = inputStream .keyBy("key") .sliceWindow(Time.seconds(5L)) .sum("value") .slideOver(Count.of(3)) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13389) Setting DataStream return type breaks some type conversion between Table and DataStream
Rong Rong created FLINK-13389: - Summary: Setting DataStream return type breaks some type conversion between Table and DataStream Key: FLINK-13389 URL: https://issues.apache.org/jira/browse/FLINK-13389 Project: Flink Issue Type: Bug Components: API / DataStream, Table SQL / API Reporter: Rong Rong When converting between data stream and table, there are situations where only GenericTypeInfo can be successfully applied, but not directly setting the specific RowTypeInfo. For example the following code doesn't work {code:java} TypeInformation[] types = { BasicTypeInfo.INT_TYPE_INFO, TimeIndicatorTypeInfo.ROWTIME_INDICATOR(), BasicTypeInfo.STRING_TYPE_INFO}; String[] names = {"a", "b", "c"}; RowTypeInfo typeInfo = new RowTypeInfo(types, names); DataStream ds = env.fromCollection(data).returns(typeInfo); Table sourceTable = tableEnv.fromDataStream(ds, "a,b,c"); tableEnv.registerTable("MyTableRow", sourceTable); DataStream stream = tableEnv.toAppendStream(sourceTable, Row.class) .map(a -> a) // this line breaks the conversion, it sets the typeinfo to RowTypeInfo. // without this line the output type is GenericTypeInfo(Row) .returns(sourceTable.getSchema().toRowType()); stream.addSink(new StreamITCase.StringSink()); env.execute(); {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-16224) Refine Hadoop Delegation Token based testing framework
Rong Rong created FLINK-16224: - Summary: Refine Hadoop Delegation Token based testing framework Key: FLINK-16224 URL: https://issues.apache.org/jira/browse/FLINK-16224 Project: Flink Issue Type: Sub-task Components: Deployment / YARN Reporter: Rong Rong Assignee: Rong Rong Currently the SecureTestEnvironment doesn't support Hadoop delegation token, which makes the E2E testing of delegation-token-based YARN application impossible. Propose to enhance the testing framework to support delegation token based launch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16236) Fix YARNSessionFIFOSecuredITCase not loading the correct security context factory
Rong Rong created FLINK-16236: - Summary: Fix YARNSessionFIFOSecuredITCase not loading the correct security context factory Key: FLINK-16236 URL: https://issues.apache.org/jira/browse/FLINK-16236 Project: Flink Issue Type: Sub-task Components: Deployment / YARN Reporter: Rong Rong Assignee: Rong Rong Follow up on FLINK-11589. Currently due to the override of the TestHadoopModuleFactory, it is not loading the HadoopContextFactory due to the compatibility checker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15561) Improve Kerberos delegation token login
Rong Rong created FLINK-15561: - Summary: Improve Kerberos delegation token login Key: FLINK-15561 URL: https://issues.apache.org/jira/browse/FLINK-15561 Project: Flink Issue Type: Bug Components: Deployment / YARN Reporter: Rong Rong Assignee: Rong Rong Currently the security HadoopModule handles delegation token login without spawning a delegation token renewal thread. We might need to include this to support delegation token. See: [1] https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L84 and [2] https://github.com/hanborq/hadoop/blob/master/src/core/org/apache/hadoop/security/UserGroupInformation.java#L538 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15475) Add isOutputTypeUsed() API to Transformation
Rong Rong created FLINK-15475: - Summary: Add isOutputTypeUsed() API to Transformation Key: FLINK-15475 URL: https://issues.apache.org/jira/browse/FLINK-15475 Project: Flink Issue Type: Improvement Components: API / Core, API / DataSet, API / DataStream Reporter: Rong Rong Assignee: Rong Rong Currently there's no way to "peek" into a Transformation and see if OutputType has been used or not. The only way is to invoke the {{setOutputType}} API and wrap around it with a try-catch block. It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to check whether a particular transformation has a definitive output type set / used or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16789) Support JMX RMI via JMXConnectorServer
Rong Rong created FLINK-16789: - Summary: Support JMX RMI via JMXConnectorServer Key: FLINK-16789 URL: https://issues.apache.org/jira/browse/FLINK-16789 Project: Flink Issue Type: New Feature Components: Runtime / Coordination, Runtime / Task Affects Versions: 1.11.0 Reporter: Rong Rong Assignee: Rong Rong Currently there are no easy way to assign jmxrmi port to a running Flink job. The typical tutorial is to add the following to both TM and JM launch env: {code} -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port= -Dcom.sun.management.jmxremote.local.only=false {code} However, setting the jmxremote port to is not usually a viable solution when Flink job is running on a shared environment (YARN / K8s / etc). setting {{-Dcom.sun.management.jmxremote.port=0}} is the best option however, there's no easy way to retrieve such port assignment. We proposed to use JMXConnectorServerFactory to explicitly establish a JMXServer inside ClusterEntrypoint & TaskManagerRunner. -- This message was sent by Atlassian Jira (v8.3.4#803005)