[jira] [Created] (FLINK-15420) Cast string to timestamp will loose precision
Jingsong Lee created FLINK-15420: Summary: Cast string to timestamp will loose precision Key: FLINK-15420 URL: https://issues.apache.org/jira/browse/FLINK-15420 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Jingsong Lee Fix For: 1.10.0 {code:java} cast('2010-10-14 12:22:22.123456' as timestamp(9)) {code} Will produce "2010-10-14 12:22:22.123" in blink planner, this should not happen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15421) GroupAggsHandler throws java.time.LocalDateTime cannot be cast to java.sql.Timestamp
Benchao Li created FLINK-15421: -- Summary: GroupAggsHandler throws java.time.LocalDateTime cannot be cast to java.sql.Timestamp Key: FLINK-15421 URL: https://issues.apache.org/jira/browse/FLINK-15421 Project: Flink Issue Type: Bug Affects Versions: 1.9.1, 1.10.0 Reporter: Benchao Li `TimestmapType` has two types of physical representation: `Timestamp` and `LocalDateTime`. When we use following SQL, it will conflict each other: {quote}SELECT SUM(cnt) as s, MAX(ts) FROM SELECT `string`, `int`, COUNT(*) AS cnt, MAX(rowtime) as ts FROM T1 GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND) GROUP BY `string` {quote} with 'table.exec.emit.early-fire.enabled' = true. The exceptions is below: {quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527) at java.lang.Thread.run(Thread.java:748) {quote} I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`: {quote} @Test def testEarlyFireWithTumblingWindow(): Unit = { val stream = failingDataSource(data) .assignTimestampsAndWatermarks( new TimestampAndWatermarkWithOffset [(Long, Int, Double, Float, BigDecimal, String, String)](10L)) val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name) tEnv.registerTable("T1", table) tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled", true) tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", "1000 ms") val sql = """ |SELECT | SUM(cnt) as s, | MAX(ts) |FROM | (SELECT |`string`, |`int`, |COUNT(*) AS cnt, |MAX(rowtime) as ts | FROM T1 | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)) |GROUP BY `string` |""".stripMargin tEnv.sqlQuery(sql).toRetractStream[Row].print() env.execute() }{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15419) Validate SQL syntax not need to depend on connector jar
Kaibo Zhou created FLINK-15419: -- Summary: Validate SQL syntax not need to depend on connector jar Key: FLINK-15419 URL: https://issues.apache.org/jira/browse/FLINK-15419 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Kaibo Zhou Fix For: 1.11.0 As a platform user, I want to integrate Flink SQL in my platform. The users will register Source/Sink Tables and Functions to catalog service through UI, and write SQL scripts on Web SQLEditor. I want to validate the SQL syntax and validate that all catalog objects exist (table, fields, UDFs). After some investigation, I decided to use the `tEnv.sqlUpdate/sqlQuery` API to do this.`SqlParser` and`FlinkSqlParserImpl` is not a good choice, as it will not read the catalog. The users have registered *Kafka* source/sink table in the catalog, so the validation logic will be: {code:java} TableEnvironment tableEnv = tEnv.registerCatalog(CATALOG_NAME, catalog); tEnv.useCatalog(CATALOG_NAME); tEnv.useDatabase(DB_NAME); tEnv.sqlUpdate("INSERT INTO sinkTable SELECT f1,f2 FROM sourceTable"); or tEnv.sqlQuery("SELECT * FROM tableName") {code} It will through exception on Flink 1.9.0 because I do not have `flink-connector-kafka_2.11-1.9.0.jar` in my classpath. {code:java} org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:132) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) The following factories have been considered: org.apache.flink.formats.json.JsonRowFormatFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) {code} For a platform provider, the user's SQL may depend on *ANY* connector or even a custom connector. It is complicated to do dynamic loading connector jar after parser the connector type in SQL. And this requires the users must upload their custom connector jar before doing a syntax check. I hope that Flink can provide a friendly way to verify the syntax of SQL whose tables/functions are already registered in the catalog, *NOT* need to depend on the jar of the connector. This makes it easier for SQL to be integrated by external platforms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15418) StreamExecMatchRule not set FlinkRelDistribution
Benchao Li created FLINK-15418: -- Summary: StreamExecMatchRule not set FlinkRelDistribution Key: FLINK-15418 URL: https://issues.apache.org/jira/browse/FLINK-15418 Project: Flink Issue Type: Bug Affects Versions: 1.9.1, 1.10.0 Reporter: Benchao Li StreamExecMatchRule forgets to set FlinkRelDistribution. When match clause with `partition by`, and parallelism > 1, will result in following exception: ``` Caused by: java.lang.NullPointerException at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336) at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159) at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:100) at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52) at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.registerEvent(SharedBuffer.java:141) at org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor.registerEvent(SharedBufferAccessor.java:74) at org.apache.flink.cep.nfa.NFA$EventWrapper.getEventId(NFA.java:483) at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:605) at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:292) at org.apache.flink.cep.nfa.NFA.process(NFA.java:228) at org.apache.flink.cep.operator.CepOperator.processEvent(CepOperator.java:420) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:242) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527) at java.lang.Thread.run(Thread.java:748) ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15417) Remove the docker volume or mount when starting Mesos e2e cluster
Yangze Guo created FLINK-15417: -- Summary: Remove the docker volume or mount when starting Mesos e2e cluster Key: FLINK-15417 URL: https://issues.apache.org/jira/browse/FLINK-15417 Project: Flink Issue Type: Test Reporter: Yangze Guo Fix For: 1.10.0 As discussed [here|https://github.com/apache/flink/pull/10695#discussion_r361574394], there is a potential risk of permission problems when cleanup logs and output. We could found another way to let containers get the input and output file. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15416) Add Retry Mechanism for PartitionRequestClientFactory.ConnectingChannel
Zhenqiu Huang created FLINK-15416: - Summary: Add Retry Mechanism for PartitionRequestClientFactory.ConnectingChannel Key: FLINK-15416 URL: https://issues.apache.org/jira/browse/FLINK-15416 Project: Flink Issue Type: Wish Components: Runtime / Network Affects Versions: 1.10.0 Reporter: Zhenqiu Huang We run a flink with 256 TMs in production. The job internally has keyby logic. Thus, it builds a 256 * 256 communication channels. An outage happened when there is a chip internal link of one of the network switchs broken that connecting these machines. During the outage, the flink can't restart successfully as there is always an exception like "Connecting the channel failed: Connecting to remote task manager + '/10.14.139.6:41300' has failed. This might indicate that the remote task manager has been lost. After deep investigation with the network infrastructure team, we found there are 6 switchs connecting with these machines. Each switch has 32 physcal links. Every socket is round-robin assigned to each of links for load balances. Thus, there is always average 256 * 256 / 6 * 32 * 2 = 170 channels will be assigned to the broken link. The issue lasted for 4 hours until we found the broken link and restart the problematic switch. Given this, we found that the retry of creating channel will help to resolve this issue. For our networking topology, we can set retry to 2. As 170 / (132 * 132) < 1, which means after retry twice no channel in 170 channels will be assigned to the broken link in the average case. I think it is valuable fix for this kind of partial network partition. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15415) Flink machine node memory is consumed quickly, but the heap has not changed
hiliuxg created FLINK-15415: --- Summary: Flink machine node memory is consumed quickly, but the heap has not changed Key: FLINK-15415 URL: https://issues.apache.org/jira/browse/FLINK-15415 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.8.0 Environment: machine : 256G memory , SSD , 32 CORE CPU config : jobmanager.heap.size: 4096m taskmanager.heap.size: 144gb taskmanager.numberOfTaskSlots: 48 taskmanager.memory.fraction: 0.7 taskmanager.memory.off-heap: false parallelism.default: 1 Reporter: hiliuxg The Flink node has been running for 1 month, and it is found that the machine's memory is getting higher and higher, even exceeding the memory configured by the heap. Through top -c, it is found that the machine's memory and virtual memory are very high. It may be an off-heap memory leak. What is the collection mechanism of flink's off-heap memory? It need to start FULL GC to recycle off-heap memory? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15414) KafkaITCase#prepare failed in travis
Dian Fu created FLINK-15414: --- Summary: KafkaITCase#prepare failed in travis Key: FLINK-15414 URL: https://issues.apache.org/jira/browse/FLINK-15414 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: Dian Fu The nightly travis for release-1.9 failed with the following error: {code:java} 15:43:24.454 [ERROR] Errors: 809815:43:24.455 [ERROR] KafkaITCase.prepare:58->KafkaTestBase.prepare:92->KafkaTestBase.prepare:100->KafkaTestBase.startClusters:134->KafkaTestBase.startClusters:145 » Kafka {code} instance: [https://api.travis-ci.org/v3/job/629636116/log.txt] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15413) ScalarOperatorsTest failed in travis
Dian Fu created FLINK-15413: --- Summary: ScalarOperatorsTest failed in travis Key: FLINK-15413 URL: https://issues.apache.org/jira/browse/FLINK-15413 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Dian Fu The travis of release-1.9 nightly failed with the following error: {code:java} 14:50:19.796 [ERROR] ScalarOperatorsTest>ExpressionTestBase.evaluateExprs:161 Wrong result for: [CASE WHEN (CASE WHEN f2 = 1 THEN CAST('' as INT) ELSE 0 END) is null THEN 'null' ELSE 'not null' END] optimized to: [_UTF-16LE'not null':VARCHAR(8) CHARACTER SET "UTF-16LE"] expected: but was: {code} instance: [https://api.travis-ci.org/v3/job/629636107/log.txt] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15412) LocalExecutorITCase#testParameterizedTypes failed in travis
Dian Fu created FLINK-15412: --- Summary: LocalExecutorITCase#testParameterizedTypes failed in travis Key: FLINK-15412 URL: https://issues.apache.org/jira/browse/FLINK-15412 Project: Flink Issue Type: Bug Components: Table SQL / Client Reporter: Dian Fu The travis of release-1.9 nightly failed with the following error: {code:java} 14:43:17.916 [INFO] Running org.apache.flink.table.client.gateway.local.LocalExecutorITCase 14:44:47.388 [ERROR] Tests run: 34, Failures: 0, Errors: 1, Skipped: 1, Time elapsed: 89.468 s <<< FAILURE! - in org.apache.flink.table.client.gateway.local.LocalExecutorITCase 14:44:47.388 [ERROR] testParameterizedTypes[Planner: blink](org.apache.flink.table.client.gateway.local.LocalExecutorITCase) Time elapsed: 7.88 s <<< ERROR! org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL statement at org.apache.flink.table.client.gateway.local.LocalExecutorITCase.testParameterizedTypes(LocalExecutorITCase.java:557) Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed at org.apache.flink.table.client.gateway.local.LocalExecutorITCase.testParameterizedTypes(LocalExecutorITCase.java:557) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed at org.apache.flink.table.client.gateway.local.LocalExecutorITCase.testParameterizedTypes(LocalExecutorITCase.java:557) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: No context matches. {code} instance: [https://api.travis-ci.org/v3/job/629636106/log.txt] -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-85: Delayed Job Graph Generation
Hi Yang, I can't agree more. The effort definitely needs to align with the final goal of FLIP-73. I am thinking about whether we can achieve the goal with two phases. 1) Phase I As the CLiFrontend will not be depreciated soon. We can still use the deployMode flag there, pass the program info through Flink configuration, use the ClassPathJobGraphRetriever to generate the job graph in ClusterEntrypoints of yarn and Kubernetes. 2) Phase II In AbstractJobClusterExecutor, the job graph is generated in the execute function. We can still use the deployMode in it. With deployMode = cluster, the execute function only starts the cluster. When {Yarn/Kuberneates}PerJobClusterEntrypoint starts, It will start the dispatch first, then we can use a ClusterEnvironment similar to ContextEnvironment to submit the job with jobName the local dispatcher. For the details, we need more investigation. Let's wait for @Aljoscha Krettek @Till Rohrmann 's feedback after the holiday season. Thank you in advance. Merry Chrismas and Happy New Year!!! Best Regards Peter Huang On Wed, Dec 25, 2019 at 1:08 AM Yang Wang wrote: > Hi Peter, > > I think we need to reconsider tison's suggestion seriously. After FLIP-73, > the deployJobCluster has > beenmoved into `JobClusterExecutor#execute`. It should not be perceived > for `CliFrontend`. That > means the user program will *ALWAYS* be executed on client side. This is > the by design behavior. > So, we could not just add `if(client mode) .. else if(cluster mode) ...` > codes in `CliFrontend` to bypass > the executor. We need to find a clean way to decouple executing user > program and deploying per-job > cluster. Based on this, we could support to execute user program on client > or master side. > > Maybe Aljoscha and Jeff could give some good suggestions. > > > > Best, > Yang > > Peter Huang 于2019年12月25日周三 上午4:03写道: > >> Hi Jingjing, >> >> The improvement proposed is a deployment option for CLI. For SQL based >> Flink application, It is more convenient to use the existing model in >> SqlClient in which >> the job graph is generated within SqlClient. After adding the delayed job >> graph generation, I think there is no change is needed for your side. >> >> >> Best Regards >> Peter Huang >> >> >> On Wed, Dec 18, 2019 at 6:01 AM jingjing bai >> wrote: >> >> > hi peter: >> > we had extension SqlClent to support sql job submit in web base on >> > flink 1.9. we support submit to yarn on per job mode too. >> > in this case, the job graph generated on client side . I think >> this >> > discuss Mainly to improve api programme. but in my case , there is no >> > jar to upload but only a sql string . >> > do u had more suggestion to improve for sql mode or it is only a >> > switch for api programme? >> > >> > >> > best >> > bai jj >> > >> > >> > Yang Wang 于2019年12月18日周三 下午7:21写道: >> > >> >> I just want to revive this discussion. >> >> >> >> Recently, i am thinking about how to natively run flink per-job >> cluster on >> >> Kubernetes. >> >> The per-job mode on Kubernetes is very different from on Yarn. And we >> will >> >> have >> >> the same deployment requirements to the client and entry point. >> >> >> >> 1. Flink client not always need a local jar to start a Flink per-job >> >> cluster. We could >> >> support multiple schemas. For example, file:///path/of/my.jar means a >> jar >> >> located >> >> at client side, hdfs://myhdfs/user/myname/flink/my.jar means a jar >> located >> >> at >> >> remote hdfs, local:///path/in/image/my.jar means a jar located at >> >> jobmanager side. >> >> >> >> 2. Support running user program on master side. This also means the >> entry >> >> point >> >> will generate the job graph on master side. We could use the >> >> ClasspathJobGraphRetriever >> >> or start a local Flink client to achieve this purpose. >> >> >> >> >> >> cc tison, Aljoscha & Kostas Do you think this is the right direction we >> >> need to work? >> >> >> >> tison 于2019年12月12日周四 下午4:48写道: >> >> >> >> > A quick idea is that we separate the deployment from user program >> that >> >> it >> >> > has always been done >> >> > outside the program. On user program executed there is always a >> >> > ClusterClient that communicates with >> >> > an existing cluster, remote or local. It will be another thread so >> just >> >> for >> >> > your information. >> >> > >> >> > Best, >> >> > tison. >> >> > >> >> > >> >> > tison 于2019年12月12日周四 下午4:40写道: >> >> > >> >> > > Hi Peter, >> >> > > >> >> > > Another concern I realized recently is that with current Executors >> >> > > abstraction(FLIP-73) >> >> > > I'm afraid that user program is designed to ALWAYS run on the >> client >> >> > side. >> >> > > Specifically, >> >> > > we deploy the job in executor when env.execute called. This >> >> abstraction >> >> > > possibly prevents >> >> > > Flink runs user program on the cluster side. >> >> > > >> >> > > For your proposal, in this case we already compiled the program and >> >> run >> >> >
[jira] [Created] (FLINK-15411) HiveCatalog can't prune partition on DATE/TIMESTAMP columns
Bowen Li created FLINK-15411: Summary: HiveCatalog can't prune partition on DATE/TIMESTAMP columns Key: FLINK-15411 URL: https://issues.apache.org/jira/browse/FLINK-15411 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.10.0, 1.11.0 Reporter: Bowen Li Assignee: Rui Li Fix For: 1.10.0, 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15410) Solace JCSMP connectors
Leonid Ilyevsky created FLINK-15410: --- Summary: Solace JCSMP connectors Key: FLINK-15410 URL: https://issues.apache.org/jira/browse/FLINK-15410 Project: Flink Issue Type: Improvement Components: Connectors / Common Reporter: Leonid Ilyevsky It would be very useful to have Flink connectors (source and sink) for Solace JCSMP, with proper checkpoint support. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-90: Support SQL 2016-2017 JSON functions in Flink SQL
Hi Jark & ForwardXu, The design doc looks very nice! Only some minor feedback from my side. As calcite has already implemented the JSON functions, I would suppose the semantics and implementation are right for SQL. For TableAPI, I think the most important is to keep align with the SQL(which has also been mentioned by Jark in the previous discussion). Have an equivalent feature set for all APIs and maintain it otherwise confusion increases especially when more and more functions are added. The document has documented how to support TableAPI. I think this is very good! And it would be better to also include ON ERROR or ON EMPTY for Table API. We can implement these features step by step, but maybe we should design all these once for all to avoid API changes later. Meanwhile, these features are also commonly required by users. Would be great to also have your opinions! Best, Hequn On Mon, Dec 23, 2019 at 10:15 AM Jark Wu wrote: > Hi Forward, > > Thanks for creating the FLIP. +1 to start a vote. > > @Hequn Cheng @Kurt Young , > could you help to review the design doc too? > > Best, > Jark > > > On Mon, 23 Dec 2019 at 10:10, tison wrote: > >> modified: >> >> https://lists.apache.org/x/thread.html/b3c0265cc2b660fe11ce550b84a831a7606de12908ff7ff0959a4794@%3Cdev.flink.apache.org%3E >> >
[jira] [Created] (FLINK-15409) Add semicolon to WindowJoinUtil#generateJoinFunction '$collectorTerm.collect($joinedRow)' statement
hailong wang created FLINK-15409: Summary: Add semicolon to WindowJoinUtil#generateJoinFunction '$collectorTerm.collect($joinedRow)' statement Key: FLINK-15409 URL: https://issues.apache.org/jira/browse/FLINK-15409 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.10.0 Reporter: hailong wang Fix For: 1.11.0 In WindowJoinUtil#generateJoinFunction, When otherCondition is none, it will go into statement: {code:java} case None => s""" |$buildJoinedRow |$collectorTerm.collect($joinedRow) |""".stripMargin {code} And it miss a semicolon after collet($joinedRow). This will cause compile fail: {code:java} Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:65) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:52) ... 26 moreCaused by: org.codehaus.commons.compiler.CompileException: Line 28, Column 21: Expression "c.collect(joinedRow)" is not a type {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15408) Interval join support no equi-condition
hailong wang created FLINK-15408: Summary: Interval join support no equi-condition Key: FLINK-15408 URL: https://issues.apache.org/jira/browse/FLINK-15408 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.10.0 Reporter: hailong wang Fix For: 1.11.0 For Now, Interval join must has at least one equi-condition. Should we need to allow no equi-condition like regular join? For that, if sql like as follow: {code:java} INSERT INTO A SELECT * FROM B join C on B.rowtime BETWEEN C.rowtime - INTERVAL '20' SECOND AND C.rowtime + INTERVAL '30' SECOND {code} It will has no matched rule to convert. {code:java} Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, UpdateAsRetractionTraitDef=false, AccModeTraitDef=UNKNOWN. Missing conversion is FlinkLogicalJoin[convention: LOGICAL -> STREAM_PHYSICAL] {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15407) Add document to explain how to write a table with PK
Jingsong Lee created FLINK-15407: Summary: Add document to explain how to write a table with PK Key: FLINK-15407 URL: https://issues.apache.org/jira/browse/FLINK-15407 Project: Flink Issue Type: Bug Components: Documentation, Table SQL / API Reporter: Jingsong Lee Fix For: 1.10.0 I have had several user problems: Why is an error reported when writing the upsertsink: TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. Users are confused. I think we can consider writing a document to describe it. User need careful like: {code:java} insert into result_table select pk1, if(pk2 is null, '', pk2) as pk2, count(*), sum(f3) from source group by pk1, pk2; {code} This will failed. {code:java} insert into result_table select pk1, pk2, count(*), sum(f1) from (select pk1, if(pk2 is null, '', pk2) as pk2, f1 from source) group by pk1, pk2; {code} This can work. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap
Darcy Lin created FLINK-15406: - Summary: The savepoint is writted by "State Processor API" can't be restore by map or flatmap Key: FLINK-15406 URL: https://issues.apache.org/jira/browse/FLINK-15406 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: 1.9.1 Reporter: Darcy Lin The savepoint is writted by "State Processor API" can't be restore by map or flatmap. But it can be retored by KeyedProcessFunction. Following is the error message: java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: device_first_user_create (1/8) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) ... 19 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15405) Make Yarn staging directory configurable
Yang Wang created FLINK-15405: - Summary: Make Yarn staging directory configurable Key: FLINK-15405 URL: https://issues.apache.org/jira/browse/FLINK-15405 Project: Flink Issue Type: New Feature Components: Deployment / YARN Reporter: Yang Wang Currently, when deploying Flink cluster on yarn, we always use home dir as the prefix of staging directory. It is not convenient when multiple yarn cluster are using the same HDFS cluster. It will be not easy to cleanup the residual application staging directories. So it will be better if we could specify a user-defined directory as staging prefix. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15404) How to insert hive table for different catalog
hehuiyuan created FLINK-15404: - Summary: How to insert hive table for different catalog Key: FLINK-15404 URL: https://issues.apache.org/jira/browse/FLINK-15404 Project: Flink Issue Type: Wish Components: Table SQL / Planner Reporter: hehuiyuan I have a hive catalog : {code:java} catalog name : myhive database : default {code} and the flink has a default catalog : {code:java} catalog name : default_catalog database : default_database {code} For example : I have a source table 'source_table' that's from kafka which is register to default_catalog, I want to insert hive table 'hive_table' that is from myhive catalog. SQL: insert into hive_table select * from source_table; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15403) 'State Migration end-to-end test from 1.6' is unstable on travis.
Xintong Song created FLINK-15403: Summary: 'State Migration end-to-end test from 1.6' is unstable on travis. Key: FLINK-15403 URL: https://issues.apache.org/jira/browse/FLINK-15403 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.10.0 Reporter: Xintong Song Fix For: 1.10.0 https://api.travis-ci.org/v3/job/629576631/log.txt The test case fails because the log contains the following error message. {code} 2019-12-26 09:19:35,537 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Received CancelTaskException while we are not canceled. This is a bug and should be reported org.apache.flink.runtime.execution.CancelTaskException: Consumed partition PipelinedSubpartitionView(index: 0) of ResultPartition 3886657fb8cc980139fac67e32d6e380@8cfcbe851fe3bb3fa00e9afc370bd963 has been released. at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:190) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:509) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:487) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:475) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:75) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:125) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15402) Add disable quote character feature for serializing row to csv format
Peng created FLINK-15402: Summary: Add disable quote character feature for serializing row to csv format Key: FLINK-15402 URL: https://issues.apache.org/jira/browse/FLINK-15402 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.9.1 Reporter: Peng We'd like to send row to kafka topic with csv format and that without any quote character. For example, input data is like Row.of("T est", 12, "Hello") and expect serialized result is {color:#0747a6}T est,12,Hello{color} rather than {color:#de350b}"T est",12,Hello{color} by default. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15401) Planner should set sink field names to setKeyFields of upsert sink
Jingsong Lee created FLINK-15401: Summary: Planner should set sink field names to setKeyFields of upsert sink Key: FLINK-15401 URL: https://issues.apache.org/jira/browse/FLINK-15401 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Jingsong Lee Fix For: 1.10.0 There is a upsert sink my_sink with schema (a int, cnt int). SQL: {code:java} INSERT INTO my_sink SELECT a, count(*) from my_source group by a;{code} will work. But: {code:java} INSERT INTO my_sink SELECT b, count(*) from my_source group by b; {code} will fail. Because the name "b" not match "a". Now, we just pass field names from plan to setKeyFields. We need pass field names from sink. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15400) elasticsearch sink support dynamic index.
ouyangwulin created FLINK-15400: --- Summary: elasticsearch sink support dynamic index. Key: FLINK-15400 URL: https://issues.apache.org/jira/browse/FLINK-15400 Project: Flink Issue Type: New Feature Components: Connectors / ElasticSearch Affects Versions: 1.9.1, 1.9.0, 1.11.0 Reporter: ouyangwulin Fix For: 1.11.0 >From >user...@flink.apache.org([https://lists.apache.org/thread.html/ac4e0c068baeb3b070f0213a2e1314e6552b226b8132a4c49d667ecd%40%3Cuser-zh.flink.apache.org%3E]), > Becuase the es 6/7 not support ttl. so User need clean the index by >timestamp. Add dynamic index is a useful function. Add with properties >'dynamicIndex' as a switch for open dynamicIndex. Add with properties >'indexField' for the extract time field, Add properties 'indexInterval' for >change cycle mode. ||With property||discribe||default||Required|| |dynamicIndex|Dynamic or not|false(true/false)|false| |indexField|extract index field| none|dynamicIndex is true , then indexField is required,only supported type "timestamp","date","long" | |indexInterval|mode for cycle|d|ddynamicIndex is true , this field is required ,可选参数值如下: d:day m:mouth w:week| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15399) Join with a LookupableTableSource:java.lang.RuntimeException: while converting XXXX Caused by: java.lang.AssertionError: Field ordinal 26 is invalid for type
Rockey Cui created FLINK-15399: -- Summary: Join with a LookupableTableSource:java.lang.RuntimeException: while converting Caused by: java.lang.AssertionError: Field ordinal 26 is invalid for type Key: FLINK-15399 URL: https://issues.apache.org/jira/browse/FLINK-15399 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.9.1 Environment: jdk1.8.0_211 Reporter: Rockey Cui Attachments: JoinTest-1.0-SNAPSHOT.jar {code:java} //代码占位符 public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); env.setParallelism(1); DataStreamSource stringDataStreamSource1 = env.fromElements( "HA" ); String[] fields1 = new String[]{"ORD_ID", "PS_PARTKEY", "PS_SUPPKEY", "PS_AVAILQTY", "PS_SUPPLYCOST", "PS_COMMENT" // key , "PS_INT", "PS_LONG" , "PS_DOUBLE8", "PS_DOUBLE14", "PS_DOUBLE15" , "PS_NUMBER1", "PS_NUMBER2", "PS_NUMBER3", "PS_NUMBER4" , "PS_DATE", "PS_TIMESTAMP", "PS_DATE_EVENT", "PS_TIMESTAMP_EVENT"}; TypeInformation[] types1 = new TypeInformation[]{Types.STRING, Types.INT, Types.LONG, Types.LONG, Types.DOUBLE, Types.STRING // key , Types.INT, Types.LONG , Types.DOUBLE, Types.DOUBLE, Types.DOUBLE , Types.LONG, Types.LONG, Types.DOUBLE, Types.DOUBLE , Types.SQL_DATE, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.SQL_TIMESTAMP}; RowTypeInfo typeInformation1 = new RowTypeInfo(types1, fields1); DataStream stream1 = stringDataStreamSource1.map(new MapFunction() { private static final long serialVersionUID = 2349572544179673356L; @Override public Row map(String s) { return new Row(typeInformation1.getArity()); } }).returns(typeInformation1); tableEnv.registerDataStream("FUN_1", stream1, String.join(",", typeInformation1.getFieldNames()) + ",PROCTIME.proctime"); DataStreamSource stringDataStreamSource2 = env.fromElements( "HA" ); String[] fields2 = new String[]{"C_NAME", "C_ADDRESS", "C_NATIONKEY" // key , "C_INT", "C_LONG" , "C_DOUBLE8", "C_DOUBLE14" , "C_DATE_EVENT", "C_TIMESTAMP_EVENT"}; TypeInformation[] types2 = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG // key , Types.INT, Types.LONG , Types.DOUBLE, Types.DOUBLE , Types.SQL_DATE, Types.SQL_TIMESTAMP}; RowTypeInfo typeInformation2 = new RowTypeInfo(types2, fields2); DataStream stream2 = stringDataStreamSource2.map(new MapFunction() { private static final long serialVersionUID = 2349572544179673349L; @Override public Row map(String s) { return new Row(typeInformation2.getArity()); } }).returns(typeInformation2); tableEnv.registerDataStream("FUN_2", stream2, String.join(",", typeInformation2.getFieldNames()) + ",PROCTIME.proctime"); MyLookupTableSource tableSource = MyLookupTableSource.newBuilder() .withFieldNames(new String[]{ "S_NAME", "S_ADDRESS", "S_PHONE" , "S_ACCTBAL", "S_COMMENT" // key , "S_INT", "S_LONG" , "S_DOUBLE8", "S_DOUBLE14" , "S_DOUBLE15", "S_DATE_EVENT", "S_TIMESTAMP_EVENT"}) .withFieldTypes(new TypeInformation[]{ Types.STRING, Types.STRING, Types.STRING , Types.DOUBLE, Types.STRING // key , Types.INT, Types.LONG , Types.DOUBLE, Types.DOUBLE , Types.DOUBLE, Types.SQL_DATE, Types.SQL_TIMESTAMP}) .build(); tableEnv.registerTableSource("INFO", tableSource); String sql = "SELECT LN(F.PS_INT),LOG(F2.C_INT,1)\n" + " FROM (SELECT *\n" + " FROM FUN_1 F1\n" + " JOIN INFO FOR SYSTEM_TIME AS OF F1.PROCTIME D1\n" + " ON F1.PS_INT = D1.S_INT AND F1.PS_LONG - 570 = D1.S_LONG \n" + ") F\n" + "JOIN FUN_2 F2 ON F.PS_INT = F2.C_INT AND F.PS_LONG - 150 = F2.C_LONG\n" + " WHERE 1=1\n" + " AND F.PS_INT BETWEEN 1000 AND 5000\n" + " AND F.S_LONG < 2147792600\n" + // I find this cause the Exception " AND F.PS_COMMENT LIKE '%FILY%'\n" + " AND F2.C_INT IS NOT NULL\n" + " AND LN(F.PS_INT)<8"; Table table = tableEnv.sqlQuery(sql); DataStream