[jira] [Created] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window

2017-08-02 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7357:


 Summary: HOP_START() HOP_END() does not work when using HAVING 
clause with GROUP BY HOP window
 Key: FLINK-7357
 URL: https://issues.apache.org/jira/browse/FLINK-7357
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.3.1
Reporter: Rong Rong


The following SQL does not compile:
{code:title=invalid_having_hop_start_sql}
SELECT 
  c AS k, 
  COUNT(a) AS v, 
  HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowStart, 
  HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd 
FROM 
  T1 
GROUP BY 
  HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), 
  c 
HAVING 
  SUM(b) > 1
{code}
While individually keeping HAVING clause or HOP_START field compiles and runs 
without issue.

more details: 
https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7177) [table] Using Table API to perform aggregation on another Table API / SQL result table causes runVolcanoPlanner failed on physicalPlan generation

2017-07-13 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7177:


 Summary: [table] Using Table API to perform aggregation on another 
Table API / SQL result table causes runVolcanoPlanner failed on physicalPlan 
generation
 Key: FLINK-7177
 URL: https://issues.apache.org/jira/browse/FLINK-7177
 Project: Flink
  Issue Type: Bug
Reporter: Rong Rong


For example:
```
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val inputTable = 
CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a, 'b)
val resultTable = inputTable.select('a, 'b).where('a.get("_1") > 0)
val failingTable = resultTable.select('a.get("_1").avg, 'a.get("_2").sum, 
'b.count)
```

Details can be found in: 
https://github.com/apache/flink/compare/master...walterddr:bug_report_sql_query_result_consume_by_table_api



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7137) Flink table API defaults top level fields as nullable and all nested fields within CompositeType as non-nullable

2017-07-07 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7137:


 Summary: Flink table API defaults top level fields as nullable and 
all nested fields within CompositeType as non-nullable
 Key: FLINK-7137
 URL: https://issues.apache.org/jira/browse/FLINK-7137
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.3.1
Reporter: Rong Rong


Right now FlinkTypeFactory does conversion between Flink TypeInformation to 
Calcite RelDataType by assuming the following: 

All top level fields will be set to nullable and all nested fields within 
CompositeRelDataType and GenericRelDataType will be set to Calcite default 
(which is non-nullable). 

This triggers Calcite SQL optimization engine drop all `IS NOT NULL` clause on 
nested fields, and would not be able to optimize when top level fields were 
actually non-nullable.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7373) Using URLClassLoader to load UDF triggers HepPlanner unexpected ClassNotFoundException

2017-08-05 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7373:


 Summary: Using URLClassLoader to load UDF triggers HepPlanner 
unexpected ClassNotFoundException
 Key: FLINK-7373
 URL: https://issues.apache.org/jira/browse/FLINK-7373
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.3.1
Reporter: Rong Rong


Using URLClassLoader to load, say from Artifactory, and instantiate UDF 
instances will cause some Rule failed during runHepPlanner or runVolcanoPlanner.

One example could add an ITCase in:
{code:title=flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala}
  @Test
  def testUserDefinedFunctionDynamicClassloader() {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env, config)

val jarFileURI = "file://org/apache/flink/table/udf/HelloWorld.jar"
val udfClassLoader: ClassLoader = new URLClassLoader(List(new 
URI(jarFileURI).toURL).toArray)
val clazz = 
udfClassLoader.loadClass("org.apache.flink.table.udf.HelloWorld")
val helloWorldUDF: ScalarFunction = 
clazz.newInstance().asInstanceOf[ScalarFunction]
tableEnv.registerFunction("helloWorld", helloWorldUDF)
val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text)
val result = table.select("text.helloWorld()")
val results = result.toDataSet[Row].collect()
val expected = "Hello World!"
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}

where
{code:title=HelloWorld.java}
package org.apache.flink.table.udf;

import org.apache.flink.table.functions.ScalarFunction;

public class HelloWorld extends ScalarFunction {
  public String eval(String o) {
if (o == null) {
  return "Hello World";
} else {
  return "Hellow World " + o.toString();
}
  }
}
{code}

This triggers the following Exception:
{panel:title=Exception}
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.

at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:31)
at 
org.apache.flink.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:45)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:62)
..
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 22: 
Cannot determine simple type name "org"
at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177)
..
{panel}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7678) SQL UserDefineTableFunction does not take CompositeType input correctly

2017-09-23 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7678:


 Summary: SQL UserDefineTableFunction does not take CompositeType 
input correctly
 Key: FLINK-7678
 URL: https://issues.apache.org/jira/browse/FLINK-7678
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.3.2
Reporter: Rong Rong


UDF is using FlinkTypeFactory to infer operand type while UDTF does not go 
through the same code path. This result in:

{code:console}
org.apache.flink.table.api.ValidationException: SQL validation failed. From 
line 1, column 38 to line 1, column 44: No match found for function signature 
func() 
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 38 to line 1, column 44: No match found for function signature 
func()
{code}

Please see github code for more info:
https://github.com/walterddr/flink/blob/bug_report/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/UDTFCompositeTypeTestFailure.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8151) [Table] removing map value equality check

2017-11-25 Thread Rong Rong (JIRA)
Rong Rong created FLINK-8151:


 Summary: [Table] removing map value equality check
 Key: FLINK-8151
 URL: https://issues.apache.org/jira/browse/FLINK-8151
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Rong Rong


Following up with FLINK-8038. The equality check is not working as Map does not 
support element-wise equality. Suggest to remove it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8104) Fix Row value constructor

2017-11-18 Thread Rong Rong (JIRA)
Rong Rong created FLINK-8104:


 Summary: Fix Row value constructor
 Key: FLINK-8104
 URL: https://issues.apache.org/jira/browse/FLINK-8104
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: Rong Rong


Support Row value constructor which is currently broken. 
See 

{code:java}
// 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
  @Test
  def testValueConstructorFunctions(): Unit = {
// TODO we need a special code path that flattens ROW types
// testSqlApi("ROW('hello world', 12)", "hello world") // test base only 
returns field 0
// testSqlApi("('hello world', 12)", "hello world") // test base only 
returns field 0
// ...
  }
{code}






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8038) Support MAP literal

2017-11-08 Thread Rong Rong (JIRA)
Rong Rong created FLINK-8038:


 Summary: Support MAP literal
 Key: FLINK-8038
 URL: https://issues.apache.org/jira/browse/FLINK-8038
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Rong Rong
Assignee: Rong Rong


Similar to https://issues.apache.org/jira/browse/FLINK-4554
We want to support Map literals which is supported by Calcite:
https://calcite.apache.org/docs/reference.html#value-constructors


{code:sql}
SELECT
  MAP['key1', f0, 'key2', f1] AS stringKeyedMap,
  MAP['key', 'value'] AS literalMap,
  MAP[f0, f1] AS fieldMap
FROM
  table
{code}

This should enable users to construct MapTypeInfo, one of the CompositeType.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly

2017-10-25 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7922:


 Summary: leastRestrictive in FlinkTypeFactory does not resolve 
composite type correctly
 Key: FLINK-7922
 URL: https://issues.apache.org/jira/browse/FLINK-7922
 Project: Flink
  Issue Type: Bug
Reporter: Rong Rong


FlinkTypeFactory does not override the following function correctly:
`leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... }` 
to deal with situations like
```
CASE 
  WHEN  THEN 
 
  ELSE 
NULL 
END
```
will trigger runtime exception



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2017-10-25 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7923:


 Summary: SQL parser exception when accessing subfields of a 
Composite element in an Object Array type column
 Key: FLINK-7923
 URL: https://issues.apache.org/jira/browse/FLINK-7923
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.0
Reporter: Rong Rong


Access type such as:
{code:SQL}
SELECT 
  a[1].f0 
FROM 
  MyTable
{code}
will cause problem. 

See following test sample for more details:
https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-9398) Flink CLI list running job returns all jobs except in CREATE state

2018-05-17 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9398:


 Summary: Flink CLI list running job returns all jobs except in 
CREATE state
 Key: FLINK-9398
 URL: https://issues.apache.org/jira/browse/FLINK-9398
 Project: Flink
  Issue Type: Bug
  Components: Client
Affects Versions: 1.5.0
Reporter: Rong Rong


See: 
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445

Seems like CLI command: *flink list -r* returns all jobs except jobs in 
*CREATE* state. which conflicts with the CLI description: *--Running/Restarting 
Jobs--*.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9484) Improve generic type inference for User-Defined Functions

2018-05-31 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9484:


 Summary: Improve generic type inference for User-Defined Functions
 Key: FLINK-9484
 URL: https://issues.apache.org/jira/browse/FLINK-9484
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Rong Rong
Assignee: Rong Rong


User-defined function has been a great extension for Flink SQL API to support 
much complex logics. 
We experienced many inconvenience when dealing with UDF with generic types and 
are summarized in the following 
[doc|https://docs.google.com/document/d/1zKSY1z0lvtQdfOgwcLnCMSRHew3weeJ6QfQjSD0zWas/edit?usp=sharing].

We are planning to implement the generic type inference / functioncatalog look 
up in multiple phases. Detail tickets will be created.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9501) Allow Object.class type in user-define functions as parameter types but not result types

2018-06-01 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9501:


 Summary: Allow Object.class type in user-define functions as 
parameter types but not result types
 Key: FLINK-9501
 URL: https://issues.apache.org/jira/browse/FLINK-9501
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Rong Rong
Assignee: Rong Rong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9502) Use generic parameter search for user-define functions when argument contains Object type

2018-06-01 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9502:


 Summary: Use generic parameter search for user-define functions 
when argument contains Object type
 Key: FLINK-9502
 URL: https://issues.apache.org/jira/browse/FLINK-9502
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Rong Rong
Assignee: Rong Rong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-05-03 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9294:


 Summary: Improve type inference for UDFs with composite parameter 
or result type 
 Key: FLINK-9294
 URL: https://issues.apache.org/jira/browse/FLINK-9294
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Rong Rong


For now most of the UDF function signatures that includes composite types such 
as `MAP` `ARRAY`, etc would require user to override `getParameterType` or 
`getResultType` method explicitly. 

It should be able to resolve the composite type based on the function signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-7934) Upgrade Calcite dependency to 1.15

2017-10-26 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7934:


 Summary: Upgrade Calcite dependency to 1.15
 Key: FLINK-7934
 URL: https://issues.apache.org/jira/browse/FLINK-7934
 Project: Flink
  Issue Type: Bug
Reporter: Rong Rong


Umbrella issue for all related issues for Apache Calcite 1.15 release.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8739) Optimize runtime support for distinct filter to reuse same distinct accumulator for filtering

2018-02-21 Thread Rong Rong (JIRA)
Rong Rong created FLINK-8739:


 Summary: Optimize runtime support for distinct filter to reuse 
same distinct accumulator for filtering
 Key: FLINK-8739
 URL: https://issues.apache.org/jira/browse/FLINK-8739
 Project: Flink
  Issue Type: Sub-task
Reporter: Rong Rong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8688) Enable distinct aggregation for data stream on Table/SQL API

2018-02-18 Thread Rong Rong (JIRA)
Rong Rong created FLINK-8688:


 Summary: Enable distinct aggregation for data stream on Table/SQL 
API
 Key: FLINK-8688
 URL: https://issues.apache.org/jira/browse/FLINK-8688
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Rong Rong
Assignee: Rong Rong


Distinct aggregation is not currently supported on data stream with Table/SQL 
API. This is an umbrella task for enabling distinct aggregation in various use 
cases.

Discussion doc can be found 
[here|[https://docs.google.com/document/d/1zj6OA-K2hi7ah8Fo-xTQB-mVmYfm6LsN2_NHgTCVmJI/edit?usp=sharing]]

 

Distinct aggregation is a very important feature in SQL processing and there 
are many JIRAs currently open with various use cases. The goal is to create one 
solution to both unbounded and bounded distinct aggregation on data stream so 
that it can easily be extended to support these use cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct operator

2018-02-18 Thread Rong Rong (JIRA)
Rong Rong created FLINK-8690:


 Summary: Update logical rule set to generate FlinkLogicalAggregate 
explicitly allow distinct operator 
 Key: FLINK-8690
 URL: https://issues.apache.org/jira/browse/FLINK-8690
 Project: Flink
  Issue Type: Sub-task
Reporter: Rong Rong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8689) Add runtime support of distinct filter using MapView for GenerateAggregation

2018-02-18 Thread Rong Rong (JIRA)
Rong Rong created FLINK-8689:


 Summary: Add runtime support of distinct filter using MapView for 
GenerateAggregation
 Key: FLINK-8689
 URL: https://issues.apache.org/jira/browse/FLINK-8689
 Project: Flink
  Issue Type: Sub-task
Reporter: Rong Rong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8691) Update table API to support distinct operator on data stream

2018-02-18 Thread Rong Rong (JIRA)
Rong Rong created FLINK-8691:


 Summary: Update table API to support distinct operator on data 
stream
 Key: FLINK-8691
 URL: https://issues.apache.org/jira/browse/FLINK-8691
 Project: Flink
  Issue Type: Sub-task
Reporter: Rong Rong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10010) Deprecate unused BaseAlignedWindowAssigner related components

2018-07-31 Thread Rong Rong (JIRA)
Rong Rong created FLINK-10010:
-

 Summary: Deprecate unused BaseAlignedWindowAssigner related 
components
 Key: FLINK-10010
 URL: https://issues.apache.org/jira/browse/FLINK-10010
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Reporter: Rong Rong
Assignee: Rong Rong


{{BaseAlignedWindowAssigner}} should be marked as deprecated and 
{{SlidingAlignedProcessingTimeWindows}} should be removed from the Flink Repo.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10172) Inconsistentcy in ExpressionParser and ExpressionDsl for order by asc/desc

2018-08-19 Thread Rong Rong (JIRA)
Rong Rong created FLINK-10172:
-

 Summary: Inconsistentcy in ExpressionParser and ExpressionDsl for 
order by asc/desc
 Key: FLINK-10172
 URL: https://issues.apache.org/jira/browse/FLINK-10172
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0
Reporter: Rong Rong
Assignee: Rong Rong


The following expression throws an exception in parsing {{"id.asc"}} term.

{code:java}
Table allOrders = orderTable
.select("id,order_date,amount,customer_id")
.orderBy("id.asc");
{code}

while it is correctly parsed for Scala:
{code:scala}
val allOrders:Table = orderTable
.select('id, 'order_date, 'amount, 'customer_id)
.orderBy('id.asc)
{code}

Anticipated some inconsistency between ExpressionParser and ExpressionDsl



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9157) Create support for commonly used external catalog

2018-04-11 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9157:


 Summary: Create support for commonly used external catalog
 Key: FLINK-9157
 URL: https://issues.apache.org/jira/browse/FLINK-9157
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Rong Rong


It will be great to have SQL-Client to support some external catalogs 
out-of-the-box for SQL users to configure and utilize easily. Such as Apache 
HCatalog. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9172) Support external catalog factory that comes default with SQL-Client

2018-04-13 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9172:


 Summary: Support external catalog factory that comes default with 
SQL-Client
 Key: FLINK-9172
 URL: https://issues.apache.org/jira/browse/FLINK-9172
 Project: Flink
  Issue Type: New Feature
Reporter: Rong Rong


It will be great to have SQL-Client to support some external catalogs 
out-of-the-box for SQL users to configure and utilize easily. I am currently 
think of having an external catalog factory that spins up both streaming and 
batch external catalog table sources and sinks. This could greatly unify and 
provide easy access for SQL users. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9199) SubtaskExecutionAttemptAccumulatorsHeaders & SubtaskExecutionAttemptDetailsHeaders has malfunctioning URL

2018-04-17 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9199:


 Summary: SubtaskExecutionAttemptAccumulatorsHeaders & 
SubtaskExecutionAttemptDetailsHeaders has malfunctioning URL
 Key: FLINK-9199
 URL: https://issues.apache.org/jira/browse/FLINK-9199
 Project: Flink
  Issue Type: Bug
  Components: REST
Reporter: Rong Rong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9232) Add harness test for AggregationCodeGenerator

2018-04-21 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9232:


 Summary: Add harness test for AggregationCodeGenerator 
 Key: FLINK-9232
 URL: https://issues.apache.org/jira/browse/FLINK-9232
 Project: Flink
  Issue Type: Sub-task
Reporter: Rong Rong


Instead of relying on ITCase to cover the codegen result. We should have direct 
test against that, for example using Harness test framework.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode

2018-12-06 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11088:
-

 Summary: Improve Kerberos Authentication using Keytab in YARN 
proxy user mode
 Key: FLINK-11088
 URL: https://issues.apache.org/jira/browse/FLINK-11088
 Project: Flink
  Issue Type: Improvement
  Components: YARN
Reporter: Rong Rong


Currently flink-yarn assumes keytab is shipped as application master 
environment local resource on client side and will be distributed to all the 
TMs. This does not work for YARN proxy user mode since proxy user or super user 
does not have access to actual user's keytab but only delegation tokens. 

We propose to have the keytab file path discovery configurable depending on the 
launch mode of the YARN client. 

Reference: 
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11006) Update Calcite dependency to 1.18

2018-11-26 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11006:
-

 Summary: Update Calcite dependency to 1.18
 Key: FLINK-11006
 URL: https://issues.apache.org/jira/browse/FLINK-11006
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Rong Rong


Umbrella task to track all dependencies and tasks needs to be done for 
upgrading to Calcite 1.18



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11276) Slicing Window Optimization

2019-01-07 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11276:
-

 Summary: Slicing Window Optimization
 Key: FLINK-11276
 URL: https://issues.apache.org/jira/browse/FLINK-11276
 Project: Flink
  Issue Type: New Feature
Reporter: Rong Rong


This is the umbrella ticket for the discussion in:
https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing

We would like to support syntax for more flexible window aggregations:
1. slicing and merging:

{code:java}
val resultStream: DataStream = inputStream
  .keyBy("key")
  .sliceWindow(Time.seconds(5L))   // new “slice window” concept: to 
combine 
   // tumble results based on discrete
   //  non-overlapping windows.
  .sum("value")
  .slideOver(Count.of(5))  // new “slide over” concept to merge 
   // tumble results.
{code}

and 
2. sliding aggregation with multi intervals, such as:

{code:java}
val slicedStream: SlicedStream = inputStream
  .keyBy("key")
  .sliceWindow(Time.seconds(5L))
  .sum("value")
val resultStream1: DataStream = slicedStream
  .slideOver(Time.seconds(10L))// slide over 5-sec slices 
   // for the last 10 seconds
val resultStream2: DataStream = slicedStream
  .slideOver(Count.of(3))  // slide over 5-sec slices 
   // for the last 3 slices (15 seconds)
val resultStream3: DataStream = slicedStream
  .slideOver(Count.of(180))// slide over 5-sec slices 
   // for the last 3 slices (15 minutes)
{code}

and
3. replace current sliding window logic with more efficient slice and merge 
approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11271) Improve Kerberos Credential Distribution

2019-01-04 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11271:
-

 Summary: Improve Kerberos Credential Distribution
 Key: FLINK-11271
 URL: https://issues.apache.org/jira/browse/FLINK-11271
 Project: Flink
  Issue Type: Improvement
  Components: Security, YARN
Reporter: Rong Rong
Assignee: Rong Rong


This is the master JIRA for the improvement listed in:

https://docs.google.com/document/d/1rBLCpyQKg6Ld2P0DEgv4VIOMTwv4sitd7h7P5r202IE/edit#heading=h.y34f96ctioqk





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11935) Remove DateTimeUtils pull-in and fix datetime casting problem

2019-03-15 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11935:
-

 Summary: Remove DateTimeUtils pull-in and fix datetime casting 
problem
 Key: FLINK-11935
 URL: https://issues.apache.org/jira/browse/FLINK-11935
 Project: Flink
  Issue Type: Sub-task
Reporter: Rong Rong
Assignee: Rong Rong


This {{DateTimeUtils}} was pulled in in FLINK-7235.

Originally the time operation was not correctly done via the {{ymdToJulian}} 
function before the date {{1970-01-01}} thus we need the fix. similar to 
addressing this problem:
{code:java}
 Optimized :1017-12-05 22:58:58.998 
 Expected :1017-11-29 22:58:58.998
 Actual :1017-12-05 22:58:58.998
{code}
 

However, after pulling in avatica 1.13, I found out that the optimized plans of 
the time operations are actually correct. it is in fact the casting part that 
creates problem:


For example, the following:
*{{(plus(-12000.months, cast('2017-11-29 22:58:58.998', TIMESTAMP))}}*

result in a StringTestExpression of:
*{{CAST(1017-11-29 22:58:58.998):VARCHAR(65536) CHARACTER SET "UTF-16LE" 
COLLATE "ISO-8859-1$en_US$primary" NOT NULL}}*

but the testing results are:
{code:java}
 Optimized :1017-11-29 22:58:58.998
 Expected :1017-11-29 22:58:58.998
 Actual :1017-11-23 22:58:58.998
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11909) Provide default failure/timeout handling strategy for AsyncIO functions

2019-03-13 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11909:
-

 Summary: Provide default failure/timeout handling strategy for 
AsyncIO functions
 Key: FLINK-11909
 URL: https://issues.apache.org/jira/browse/FLINK-11909
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Rong Rong


Currently Flink AsyncIO by default fails the entire job when async function 
invoke fails [1]. It would be nice to have some default Async IO 
failure/timeout handling strategy, or opens up some APIs for AsyncFunction 
timeout method to interact with the AsyncWaitOperator. For example (quote 
[~suez1224]) :

* FAIL_OPERATOR (default & current behavior)
* FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
* EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11936) Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter issue.

2019-03-15 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11936:
-

 Summary: Remove AuxiliaryConverter pull-in from calcite and fix 
auxiliary converter issue.
 Key: FLINK-11936
 URL: https://issues.apache.org/jira/browse/FLINK-11936
 Project: Flink
  Issue Type: Sub-task
Reporter: Rong Rong


AuxiliaryConverter was pulled in FLINK-6409.

Since CALCITE-1761 has been fixed, we should sync back with the calcite version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11569) Row type does not serialize in to readable format when invoke "toString" method

2019-02-08 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11569:
-

 Summary: Row type does not serialize in to readable format when 
invoke "toString" method
 Key: FLINK-11569
 URL: https://issues.apache.org/jira/browse/FLINK-11569
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Rong Rong
Assignee: Rong Rong


Seems like the "toString" method for Row type is only concatenating all fields 
using COMMA ",". However it does not wrap the entire Row in some type of 
encapsulation, for example "()". This results in nested Row being serialized as 
if they are all in one level.

For example: {{Row.of("a", 1, Row.of("b", 2))}} is printed out as 
{{"a",1,"b",2}}. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11589) Introduce service provider pattern for user to dynamically load SecurityFactory classes

2019-02-12 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11589:
-

 Summary: Introduce service provider pattern for user to 
dynamically load SecurityFactory classes
 Key: FLINK-11589
 URL: https://issues.apache.org/jira/browse/FLINK-11589
 Project: Flink
  Issue Type: Sub-task
  Components: Security
Reporter: Rong Rong
Assignee: Rong Rong


Currently there are only 3 security modules in Flink - Hadoop, Zookeeper and 
JaaS, all of which are pre-loaded to the Flink security runtime with one 
hard-coded path for instantiating SecurityContext, which is used invoke use 
code with PrivilegedExceptionAction.

We propose to introduce a [service provider 
pattern|https://docs.oracle.com/javase/tutorial/ext/basics/spi.html] to allow 
users to dynamically load {{SecurityModuleFactory}} or even introduce a new 
{{SecurityContextFactory}} so that all the security runtime context can be set 
by dynamically loading any 3rd party JAR. and discover them through property 
configurations.

This is especially useful in a corporate environment where proprietary security 
technologies are involved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11453) Support SliceWindow with forwardable pane info

2019-01-29 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11453:
-

 Summary: Support SliceWindow with forwardable pane  info
 Key: FLINK-11453
 URL: https://issues.apache.org/jira/browse/FLINK-11453
 Project: Flink
  Issue Type: Sub-task
  Components: DataStream API
Reporter: Rong Rong


Support slicing operation that produces slicing:
{code:java}
val slicedStream: SlicedStream = inputStream
  .keyBy("key")
  .sliceWindow(Time.seconds(5L))   // new “slice window” concept: to 
combine 
   // tumble results based on discrete
   // non-overlapping windows.
  .aggregate(aggFunc)
{code}
{{SlicedStream}} will produce results that exposes current {{WindowOperator}} 
internal state {{InternalAppendingState}}, which can be 
later applied with {{WindowFunction}} separately in another operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11454) Support MergedStream operation

2019-01-29 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11454:
-

 Summary: Support MergedStream operation
 Key: FLINK-11454
 URL: https://issues.apache.org/jira/browse/FLINK-11454
 Project: Flink
  Issue Type: Sub-task
  Components: DataStream API
Reporter: Rong Rong


Following SlicedStream, the mergedStream operator merges results from sliced 
stream and produces windowing results.
{code:java}
val slicedStream: SlicedStream = inputStream
  .keyBy("key")
  .sliceWindow(Time.seconds(5L))   // new “slice window” concept: to 
combine 
   // tumble results based on discrete
   // non-overlapping windows.
  .aggregate(aggFunc)

val mergedStream1: MergedStream = slicedStream
  .slideOver(Time.second(10L)) // combine slice results with same   
 
   // windowing function, equivalent to 
   // WindowOperator with an aggregate 
state 
   // and derived aggregate function.

val mergedStream2: MergedStream = slicedStream
  .slideOver(Count.of(5))
  .apply(windowFunction)   // apply a different window function 
over  
   // the sliced results.{code}
MergedStream are produced by MergeOperator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11455) Support evictor operations on slicing and merging operators

2019-01-29 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11455:
-

 Summary: Support evictor operations on slicing and merging 
operators
 Key: FLINK-11455
 URL: https://issues.apache.org/jira/browse/FLINK-11455
 Project: Flink
  Issue Type: Sub-task
Reporter: Rong Rong


The original implementation POC of SliceStream and MergeStream does not 
considere evicting window operations. this support can be further expanded in 
order to cover multiple timeout duration session windows. See 
[https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit#heading=h.ihxm3alf3tk0.]
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11456) Improve window operator with sliding window assigners

2019-01-29 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11456:
-

 Summary: Improve window operator with sliding window assigners
 Key: FLINK-11456
 URL: https://issues.apache.org/jira/browse/FLINK-11456
 Project: Flink
  Issue Type: Sub-task
  Components: DataStream API
Reporter: Rong Rong


With Slicing and merging operators that exposes the internals of window 
operators. current sliding window can be improved by eliminating duplicate 
aggregations or duplicate element insert into multiple panes (e.g. namespaces). 

The following sliding window operation
{code:java}
val resultStream: DataStream = inputStream
  .keyBy("key")
  .window(SlidingEventTimeWindow.of(Time.seconds(5L), Time.seconds(15L)))
  .sum("value")
{code}
can produce job graph equivalent to
{code:java}
val resultStream: DataStream = inputStream
  .keyBy("key")
  .sliceWindow(Time.seconds(5L))
  .sum("value")
  .slideOver(Count.of(3))
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13389) Setting DataStream return type breaks some type conversion between Table and DataStream

2019-07-23 Thread Rong Rong (JIRA)
Rong Rong created FLINK-13389:
-

 Summary: Setting DataStream return type breaks some type 
conversion between Table and DataStream
 Key: FLINK-13389
 URL: https://issues.apache.org/jira/browse/FLINK-13389
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Table SQL / API
Reporter: Rong Rong


When converting between data stream and table, there are situations where only 
GenericTypeInfo can be successfully applied, but not directly setting the 
specific RowTypeInfo.
For example the following code doesn't work

{code:java}
TypeInformation[] types = {
BasicTypeInfo.INT_TYPE_INFO,
TimeIndicatorTypeInfo.ROWTIME_INDICATOR(),
BasicTypeInfo.STRING_TYPE_INFO};
String[] names = {"a", "b", "c"};
RowTypeInfo typeInfo = new RowTypeInfo(types, names);
DataStream ds = env.fromCollection(data).returns(typeInfo);
Table sourceTable = tableEnv.fromDataStream(ds, "a,b,c");
tableEnv.registerTable("MyTableRow", sourceTable);

DataStream stream = tableEnv.toAppendStream(sourceTable, 
Row.class)
.map(a -> a)
// this line breaks the conversion, it sets the 
typeinfo to RowTypeInfo.
// without this line the output type is 
GenericTypeInfo(Row)
.returns(sourceTable.getSchema().toRowType());  
stream.addSink(new StreamITCase.StringSink());
env.execute();
{code}




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-16224) Refine Hadoop Delegation Token based testing framework

2020-02-21 Thread Rong Rong (Jira)
Rong Rong created FLINK-16224:
-

 Summary: Refine Hadoop Delegation Token based testing framework
 Key: FLINK-16224
 URL: https://issues.apache.org/jira/browse/FLINK-16224
 Project: Flink
  Issue Type: Sub-task
  Components: Deployment / YARN
Reporter: Rong Rong
Assignee: Rong Rong


Currently the SecureTestEnvironment doesn't support Hadoop delegation token, 
which makes the E2E testing of delegation-token-based YARN application 
impossible.

Propose to enhance the testing framework to support delegation token based 
launch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16236) Fix YARNSessionFIFOSecuredITCase not loading the correct security context factory

2020-02-22 Thread Rong Rong (Jira)
Rong Rong created FLINK-16236:
-

 Summary: Fix YARNSessionFIFOSecuredITCase not loading the correct 
security context factory
 Key: FLINK-16236
 URL: https://issues.apache.org/jira/browse/FLINK-16236
 Project: Flink
  Issue Type: Sub-task
  Components: Deployment / YARN
Reporter: Rong Rong
Assignee: Rong Rong


Follow up on FLINK-11589. Currently due to the override of the 
TestHadoopModuleFactory, it is not loading the HadoopContextFactory due to the 
compatibility checker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15561) Improve Kerberos delegation token login

2020-01-12 Thread Rong Rong (Jira)
Rong Rong created FLINK-15561:
-

 Summary: Improve Kerberos delegation token login 
 Key: FLINK-15561
 URL: https://issues.apache.org/jira/browse/FLINK-15561
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Reporter: Rong Rong
Assignee: Rong Rong


Currently the security HadoopModule handles delegation token login without 
spawning a delegation token renewal thread. We might need to include this to 
support delegation token.

See: [1] 
https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L84
 
and 
[2] 
https://github.com/hanborq/hadoop/blob/master/src/core/org/apache/hadoop/security/UserGroupInformation.java#L538



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15475) Add isOutputTypeUsed() API to Transformation

2020-01-03 Thread Rong Rong (Jira)
Rong Rong created FLINK-15475:
-

 Summary: Add isOutputTypeUsed() API to Transformation
 Key: FLINK-15475
 URL: https://issues.apache.org/jira/browse/FLINK-15475
 Project: Flink
  Issue Type: Improvement
  Components: API / Core, API / DataSet, API / DataStream
Reporter: Rong Rong
Assignee: Rong Rong


Currently there's no way to "peek" into a Transformation and see if OutputType 
has been used or not. The only way is to invoke the {{setOutputType}} API and 
wrap around it with a try-catch block. 

It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to 
check whether a particular transformation has a definitive output type set / 
used or not.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16789) Support JMX RMI via JMXConnectorServer

2020-03-25 Thread Rong Rong (Jira)
Rong Rong created FLINK-16789:
-

 Summary: Support JMX RMI via JMXConnectorServer
 Key: FLINK-16789
 URL: https://issues.apache.org/jira/browse/FLINK-16789
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Coordination, Runtime / Task
Affects Versions: 1.11.0
Reporter: Rong Rong
Assignee: Rong Rong


Currently there are no easy way to assign jmxrmi port to a running Flink job. 

The typical tutorial is to add the following to both TM and JM launch env:
{code}
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=
-Dcom.sun.management.jmxremote.local.only=false
{code}
However, setting the jmxremote port to  is not usually a viable solution 
when Flink job is running on a shared environment (YARN / K8s / etc).

setting {{-Dcom.sun.management.jmxremote.port=0}} is the best option however, 
there's no easy way to retrieve such port assignment. We proposed to use 
JMXConnectorServerFactory to explicitly establish a JMXServer inside 
ClusterEntrypoint & TaskManagerRunner.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)