[jira] [Created] (FLINK-15420) Cast string to timestamp will loose precision

2019-12-26 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-15420:


 Summary: Cast string to timestamp will loose precision
 Key: FLINK-15420
 URL: https://issues.apache.org/jira/browse/FLINK-15420
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Jingsong Lee
 Fix For: 1.10.0


{code:java}
cast('2010-10-14 12:22:22.123456' as timestamp(9))
{code}
Will produce "2010-10-14 12:22:22.123" in blink planner, this should not happen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15421) GroupAggsHandler throws java.time.LocalDateTime cannot be cast to java.sql.Timestamp

2019-12-26 Thread Benchao Li (Jira)
Benchao Li created FLINK-15421:
--

 Summary: GroupAggsHandler throws java.time.LocalDateTime cannot be 
cast to java.sql.Timestamp
 Key: FLINK-15421
 URL: https://issues.apache.org/jira/browse/FLINK-15421
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.1, 1.10.0
Reporter: Benchao Li


`TimestmapType` has two types of physical representation: `Timestamp` and 
`LocalDateTime`. When we use following SQL, it will conflict each other:
{quote}SELECT 
 SUM(cnt) as s, 
 MAX(ts)
 FROM 
 SELECT 
 `string`,
 `int`,
 COUNT(*) AS cnt,
 MAX(rowtime) as ts
 FROM T1
 GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)
 GROUP BY `string`
{quote}
with 'table.exec.emit.early-fire.enabled' = true.

The exceptions is below:
{quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot 
be cast to java.sql.Timestamp
 at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
 at java.lang.Thread.run(Thread.java:748)
{quote}
I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:
{quote} @Test
  def testEarlyFireWithTumblingWindow(): Unit = {
val stream = failingDataSource(data)
  .assignTimestampsAndWatermarks(
new TimestampAndWatermarkWithOffset
  [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
val table = stream.toTable(tEnv,
  'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
tEnv.registerTable("T1", table)

tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled",
 true)

tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", 
"1000 ms")

val sql =
  """
|SELECT
|  SUM(cnt) as s,
|  MAX(ts)
|FROM
|  (SELECT
|`string`,
|`int`,
|COUNT(*) AS cnt,
|MAX(rowtime) as ts
|  FROM T1
|  GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))
|GROUP BY `string`
|""".stripMargin

tEnv.sqlQuery(sql).toRetractStream[Row].print()
env.execute()
  }{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15419) Validate SQL syntax not need to depend on connector jar

2019-12-26 Thread Kaibo Zhou (Jira)
Kaibo Zhou created FLINK-15419:
--

 Summary: Validate SQL syntax not need to depend on connector jar
 Key: FLINK-15419
 URL: https://issues.apache.org/jira/browse/FLINK-15419
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Kaibo Zhou
 Fix For: 1.11.0


As a platform user, I want to integrate Flink SQL in my platform.

The users will register Source/Sink Tables and Functions to catalog service 
through UI, and write SQL scripts on Web SQLEditor. I want to validate the SQL 
syntax and validate that all catalog objects exist (table, fields, UDFs). 

After some investigation, I decided to use the `tEnv.sqlUpdate/sqlQuery` API to 
do this.`SqlParser` and`FlinkSqlParserImpl` is not a good choice, as it will 
not read the catalog.

The users have registered *Kafka* source/sink table in the catalog, so the 
validation logic will be:
{code:java}
TableEnvironment tableEnv = 

tEnv.registerCatalog(CATALOG_NAME, catalog);
tEnv.useCatalog(CATALOG_NAME);
tEnv.useDatabase(DB_NAME);

tEnv.sqlUpdate("INSERT INTO sinkTable SELECT f1,f2 FROM sourceTable"); 
or  
tEnv.sqlQuery("SELECT * FROM tableName")
{code}
It will through exception on Flink 1.9.0 because I do not have 
`flink-connector-kafka_2.11-1.9.0.jar`  in my classpath.
{code:java}
org.apache.flink.table.api.ValidationException: SQL validation failed. 
findAndCreateTableSource failed.org.apache.flink.table.api.ValidationException: 
SQL validation failed. findAndCreateTableSource failed. at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:132)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)

The following factories have been considered:
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
{code}
For a platform provider, the user's SQL may depend on *ANY* connector or even a 
custom connector. It is complicated to do dynamic loading connector jar after 
parser the connector type in SQL. And this requires the users must upload their 
custom connector jar before doing a syntax check.

I hope that Flink can provide a friendly way to verify the syntax of SQL whose 
tables/functions are already registered in the catalog, *NOT* need to depend on 
the jar of the connector. This makes it easier for SQL to be integrated by 
external platforms.
  

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15418) StreamExecMatchRule not set FlinkRelDistribution

2019-12-26 Thread Benchao Li (Jira)
Benchao Li created FLINK-15418:
--

 Summary: StreamExecMatchRule not set FlinkRelDistribution
 Key: FLINK-15418
 URL: https://issues.apache.org/jira/browse/FLINK-15418
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.1, 1.10.0
Reporter: Benchao Li


StreamExecMatchRule forgets to set FlinkRelDistribution. When match clause with 
`partition by`, and parallelism > 1, will result in following exception:

```
Caused by: java.lang.NullPointerException
at 
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
at 
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
at 
org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:100)
at 
org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
at 
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.registerEvent(SharedBuffer.java:141)
at 
org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor.registerEvent(SharedBufferAccessor.java:74)
at org.apache.flink.cep.nfa.NFA$EventWrapper.getEventId(NFA.java:483)
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:605)
at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:292)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:228)
at 
org.apache.flink.cep.operator.CepOperator.processEvent(CepOperator.java:420)
at 
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:242)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
at java.lang.Thread.run(Thread.java:748)
```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15417) Remove the docker volume or mount when starting Mesos e2e cluster

2019-12-26 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-15417:
--

 Summary: Remove the docker volume or mount when starting Mesos e2e 
cluster
 Key: FLINK-15417
 URL: https://issues.apache.org/jira/browse/FLINK-15417
 Project: Flink
  Issue Type: Test
Reporter: Yangze Guo
 Fix For: 1.10.0


As discussed 
[here|https://github.com/apache/flink/pull/10695#discussion_r361574394], there  
is a potential risk of permission problems when cleanup logs and output. We 
could found another way to let containers get the input and output file.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15416) Add Retry Mechanism for PartitionRequestClientFactory.ConnectingChannel

2019-12-26 Thread Zhenqiu Huang (Jira)
Zhenqiu Huang created FLINK-15416:
-

 Summary: Add Retry Mechanism for 
PartitionRequestClientFactory.ConnectingChannel
 Key: FLINK-15416
 URL: https://issues.apache.org/jira/browse/FLINK-15416
 Project: Flink
  Issue Type: Wish
  Components: Runtime / Network
Affects Versions: 1.10.0
Reporter: Zhenqiu Huang


We run a flink with 256 TMs in production. The job internally has keyby logic. 
Thus, it builds a 256 * 256 communication channels. An outage happened when 
there is a chip internal link of one of the network switchs broken that 
connecting these machines. During the outage, the flink can't restart 
successfully as there is always an exception like  "Connecting the channel 
failed: Connecting to remote task manager + '/10.14.139.6:41300' has 
failed. This might indicate that the remote task manager has been lost. 

After deep investigation with the network infrastructure team, we found there 
are 6 switchs connecting with these machines. Each switch has 32 physcal links. 
Every socket is round-robin assigned to each of links for load balances. Thus, 
there is always average 256 * 256 / 6 * 32  * 2 = 170 channels will be assigned 
to the broken link. The issue lasted for 4 hours until we found the broken link 
and restart the problematic switch. 

Given this, we found that the retry of creating channel will help to resolve 
this issue. For our networking topology, we can set retry to 2. As 170 / (132 * 
132) < 1, which means after retry twice no channel in 170 channels will be 
assigned to the broken link in the average case.

I think it is valuable fix for this kind of partial network partition.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15415) Flink machine node memory is consumed quickly, but the heap has not changed

2019-12-26 Thread hiliuxg (Jira)
hiliuxg created FLINK-15415:
---

 Summary: Flink machine node memory is consumed quickly, but the 
heap has not changed
 Key: FLINK-15415
 URL: https://issues.apache.org/jira/browse/FLINK-15415
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.8.0
 Environment: machine : 256G memory , SSD , 32 CORE CPU

config :

jobmanager.heap.size: 4096m

taskmanager.heap.size: 144gb

taskmanager.numberOfTaskSlots: 48
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: false

parallelism.default: 1

 

 

 

 

 
Reporter: hiliuxg


 
 
The Flink node has been running for 1 month, and it is found that the machine's 
memory is getting higher and higher, even exceeding the memory configured by 
the heap. Through top -c, it is found that the machine's memory and virtual 
memory are very high. It may be an off-heap memory leak. What is the collection 
mechanism of flink's off-heap memory? It need to start FULL GC to recycle 
off-heap memory?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15414) KafkaITCase#prepare failed in travis

2019-12-26 Thread Dian Fu (Jira)
Dian Fu created FLINK-15414:
---

 Summary: KafkaITCase#prepare failed in travis
 Key: FLINK-15414
 URL: https://issues.apache.org/jira/browse/FLINK-15414
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Dian Fu


The nightly travis for release-1.9 failed with the following error:
{code:java}
15:43:24.454 [ERROR] Errors: 809815:43:24.455 [ERROR]   
KafkaITCase.prepare:58->KafkaTestBase.prepare:92->KafkaTestBase.prepare:100->KafkaTestBase.startClusters:134->KafkaTestBase.startClusters:145
 » Kafka
{code}
instance: [https://api.travis-ci.org/v3/job/629636116/log.txt]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15413) ScalarOperatorsTest failed in travis

2019-12-26 Thread Dian Fu (Jira)
Dian Fu created FLINK-15413:
---

 Summary: ScalarOperatorsTest failed in travis
 Key: FLINK-15413
 URL: https://issues.apache.org/jira/browse/FLINK-15413
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Dian Fu


The travis of release-1.9 nightly failed with the following error:
{code:java}
14:50:19.796 [ERROR] ScalarOperatorsTest>ExpressionTestBase.evaluateExprs:161 
Wrong result for: [CASE WHEN (CASE WHEN f2 = 1 THEN CAST('' as INT) ELSE 0 END) 
is null THEN 'null' ELSE 'not null' END] optimized to: [_UTF-16LE'not 
null':VARCHAR(8) CHARACTER SET "UTF-16LE"] expected: but was:
{code}
instance: [https://api.travis-ci.org/v3/job/629636107/log.txt]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15412) LocalExecutorITCase#testParameterizedTypes failed in travis

2019-12-26 Thread Dian Fu (Jira)
Dian Fu created FLINK-15412:
---

 Summary: LocalExecutorITCase#testParameterizedTypes failed in 
travis
 Key: FLINK-15412
 URL: https://issues.apache.org/jira/browse/FLINK-15412
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Reporter: Dian Fu


The travis of release-1.9 nightly failed with the following error:
{code:java}
14:43:17.916 [INFO] Running 
org.apache.flink.table.client.gateway.local.LocalExecutorITCase
14:44:47.388 [ERROR] Tests run: 34, Failures: 0, Errors: 1, Skipped: 1, Time 
elapsed: 89.468 s <<< FAILURE! - in 
org.apache.flink.table.client.gateway.local.LocalExecutorITCase
14:44:47.388 [ERROR] testParameterizedTypes[Planner: 
blink](org.apache.flink.table.client.gateway.local.LocalExecutorITCase) Time 
elapsed: 7.88 s <<< ERROR!
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL 
statement at 
org.apache.flink.table.client.gateway.local.LocalExecutorITCase.testParameterizedTypes(LocalExecutorITCase.java:557)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
failed. findAndCreateTableSource failed
 at 
org.apache.flink.table.client.gateway.local.LocalExecutorITCase.testParameterizedTypes(LocalExecutorITCase.java:557)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed
 at 
org.apache.flink.table.client.gateway.local.LocalExecutorITCase.testParameterizedTypes(LocalExecutorITCase.java:557)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: 
Could not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No context matches.
{code}
instance: [https://api.travis-ci.org/v3/job/629636106/log.txt]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-85: Delayed Job Graph Generation

2019-12-26 Thread Peter Huang
Hi Yang,

I can't agree more. The effort definitely needs to align with the final
goal of FLIP-73.
I am thinking about whether we can achieve the goal with two phases.

1) Phase I
As the CLiFrontend will not be depreciated soon. We can still use the
deployMode flag there,
pass the program info through Flink configuration,  use the
ClassPathJobGraphRetriever
to generate the job graph in ClusterEntrypoints of yarn and Kubernetes.

2) Phase II
In  AbstractJobClusterExecutor, the job graph is generated in the execute
function. We can still
use the deployMode in it. With deployMode = cluster, the execute function
only starts the cluster.

When {Yarn/Kuberneates}PerJobClusterEntrypoint starts, It will start the
dispatch first, then we can use
a ClusterEnvironment similar to ContextEnvironment to submit the job with
jobName the local
dispatcher. For the details, we need more investigation. Let's wait
for @Aljoscha
Krettek  @Till Rohrmann 's
feedback after the holiday season.

Thank you in advance. Merry Chrismas and Happy New Year!!!


Best Regards
Peter Huang








On Wed, Dec 25, 2019 at 1:08 AM Yang Wang  wrote:

> Hi Peter,
>
> I think we need to reconsider tison's suggestion seriously. After FLIP-73,
> the deployJobCluster has
> beenmoved into `JobClusterExecutor#execute`. It should not be perceived
> for `CliFrontend`. That
> means the user program will *ALWAYS* be executed on client side. This is
> the by design behavior.
> So, we could not just add `if(client mode) .. else if(cluster mode) ...`
> codes in `CliFrontend` to bypass
> the executor. We need to find a clean way to decouple executing user
> program and deploying per-job
> cluster. Based on this, we could support to execute user program on client
> or master side.
>
> Maybe Aljoscha and Jeff could give some good suggestions.
>
>
>
> Best,
> Yang
>
> Peter Huang  于2019年12月25日周三 上午4:03写道:
>
>> Hi Jingjing,
>>
>> The improvement proposed is a deployment option for CLI. For SQL based
>> Flink application, It is more convenient to use the existing model in
>> SqlClient in which
>> the job graph is generated within SqlClient. After adding the delayed job
>> graph generation, I think there is no change is needed for your side.
>>
>>
>> Best Regards
>> Peter Huang
>>
>>
>> On Wed, Dec 18, 2019 at 6:01 AM jingjing bai 
>> wrote:
>>
>> > hi peter:
>> > we had extension SqlClent to support sql job submit in web base on
>> > flink 1.9.   we support submit to yarn on per job mode too.
>> > in this case, the job graph generated  on client side .  I think
>> this
>> > discuss Mainly to improve api programme.  but in my case , there is no
>> > jar to upload but only a sql string .
>> > do u had more suggestion to improve for sql mode or it is only a
>> > switch for api programme?
>> >
>> >
>> > best
>> > bai jj
>> >
>> >
>> > Yang Wang  于2019年12月18日周三 下午7:21写道:
>> >
>> >>  I just want to revive this discussion.
>> >>
>> >> Recently, i am thinking about how to natively run flink per-job
>> cluster on
>> >> Kubernetes.
>> >> The per-job mode on Kubernetes is very different from on Yarn. And we
>> will
>> >> have
>> >> the same deployment requirements to the client and entry point.
>> >>
>> >> 1. Flink client not always need a local jar to start a Flink per-job
>> >> cluster. We could
>> >> support multiple schemas. For example, file:///path/of/my.jar means a
>> jar
>> >> located
>> >> at client side, hdfs://myhdfs/user/myname/flink/my.jar means a jar
>> located
>> >> at
>> >> remote hdfs, local:///path/in/image/my.jar means a jar located at
>> >> jobmanager side.
>> >>
>> >> 2. Support running user program on master side. This also means the
>> entry
>> >> point
>> >> will generate the job graph on master side. We could use the
>> >> ClasspathJobGraphRetriever
>> >> or start a local Flink client to achieve this purpose.
>> >>
>> >>
>> >> cc tison, Aljoscha & Kostas Do you think this is the right direction we
>> >> need to work?
>> >>
>> >> tison  于2019年12月12日周四 下午4:48写道:
>> >>
>> >> > A quick idea is that we separate the deployment from user program
>> that
>> >> it
>> >> > has always been done
>> >> > outside the program. On user program executed there is always a
>> >> > ClusterClient that communicates with
>> >> > an existing cluster, remote or local. It will be another thread so
>> just
>> >> for
>> >> > your information.
>> >> >
>> >> > Best,
>> >> > tison.
>> >> >
>> >> >
>> >> > tison  于2019年12月12日周四 下午4:40写道:
>> >> >
>> >> > > Hi Peter,
>> >> > >
>> >> > > Another concern I realized recently is that with current Executors
>> >> > > abstraction(FLIP-73)
>> >> > > I'm afraid that user program is designed to ALWAYS run on the
>> client
>> >> > side.
>> >> > > Specifically,
>> >> > > we deploy the job in executor when env.execute called. This
>> >> abstraction
>> >> > > possibly prevents
>> >> > > Flink runs user program on the cluster side.
>> >> > >
>> >> > > For your proposal, in this case we already compiled the program and
>> >> run
>> >> > 

[jira] [Created] (FLINK-15411) HiveCatalog can't prune partition on DATE/TIMESTAMP columns

2019-12-26 Thread Bowen Li (Jira)
Bowen Li created FLINK-15411:


 Summary: HiveCatalog can't prune partition on DATE/TIMESTAMP 
columns
 Key: FLINK-15411
 URL: https://issues.apache.org/jira/browse/FLINK-15411
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.10.0, 1.11.0
Reporter: Bowen Li
Assignee: Rui Li
 Fix For: 1.10.0, 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15410) Solace JCSMP connectors

2019-12-26 Thread Leonid Ilyevsky (Jira)
Leonid Ilyevsky created FLINK-15410:
---

 Summary: Solace JCSMP connectors
 Key: FLINK-15410
 URL: https://issues.apache.org/jira/browse/FLINK-15410
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Reporter: Leonid Ilyevsky


It would be very useful to have Flink connectors (source and sink) for Solace 
JCSMP, with proper checkpoint support.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-90: Support SQL 2016-2017 JSON functions in Flink SQL

2019-12-26 Thread Hequn Cheng
Hi Jark & ForwardXu,

The design doc looks very nice! Only some minor feedback from my side.

As calcite has already implemented the JSON functions, I would suppose the
semantics and implementation are right for SQL.

For TableAPI, I think the most important is to keep align with the
SQL(which has also been mentioned by Jark in the previous discussion). Have
an equivalent feature set for all APIs and maintain it otherwise confusion
increases especially when more and more functions are added. The document
has documented how to support TableAPI. I think this is very good! And it
would be better to also include ON ERROR or ON EMPTY for Table API. We can
implement these features step by step, but maybe we should design all these
once for all to avoid API changes later. Meanwhile, these features are also
commonly required by users.

Would be great to also have your opinions!

Best,
Hequn


On Mon, Dec 23, 2019 at 10:15 AM Jark Wu  wrote:

> Hi Forward,
>
> Thanks for creating the FLIP. +1 to start a vote.
>
>  @Hequn Cheng  @Kurt Young  ,
> could you help to review the design doc too?
>
> Best,
> Jark
>
>
> On Mon, 23 Dec 2019 at 10:10, tison  wrote:
>
>> modified:
>>
>> https://lists.apache.org/x/thread.html/b3c0265cc2b660fe11ce550b84a831a7606de12908ff7ff0959a4794@%3Cdev.flink.apache.org%3E
>>
>


[jira] [Created] (FLINK-15409) Add semicolon to WindowJoinUtil#generateJoinFunction '$collectorTerm.collect($joinedRow)' statement

2019-12-26 Thread hailong wang (Jira)
hailong wang created FLINK-15409:


 Summary: Add semicolon to WindowJoinUtil#generateJoinFunction 
'$collectorTerm.collect($joinedRow)' statement
 Key: FLINK-15409
 URL: https://issues.apache.org/jira/browse/FLINK-15409
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.10.0
Reporter: hailong wang
 Fix For: 1.11.0


In WindowJoinUtil#generateJoinFunction, When otherCondition is none, it will go 
into  statement:
{code:java}
case None =>
  s"""
 |$buildJoinedRow
 |$collectorTerm.collect($joinedRow)
 |""".stripMargin
{code}
And it miss a semicolon after collet($joinedRow). This will cause compile fail:
{code:java}
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.Caused by: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue. at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
 at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:65)
 at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
 at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:52)
 ... 26 moreCaused by: org.codehaus.commons.compiler.CompileException: Line 28, 
Column 21: Expression "c.collect(joinedRow)" is not a type
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15408) Interval join support no equi-condition

2019-12-26 Thread hailong wang (Jira)
hailong wang created FLINK-15408:


 Summary: Interval join support no equi-condition
 Key: FLINK-15408
 URL: https://issues.apache.org/jira/browse/FLINK-15408
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.10.0
Reporter: hailong wang
 Fix For: 1.11.0


For Now, Interval join must has at least one equi-condition. Should we need to 
allow no equi-condition like regular join?

For that, if sql like as follow:
{code:java}
INSERT INTO A SELECT * FROM B join C on B.rowtime BETWEEN C.rowtime - INTERVAL 
'20' SECOND AND C.rowtime + INTERVAL '30' SECOND
{code}
It will has no matched rule to convert.
{code:java}
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: 
convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, 
MiniBatchIntervalTraitDef=None: 0, UpdateAsRetractionTraitDef=false, 
AccModeTraitDef=UNKNOWN.
Missing conversion is FlinkLogicalJoin[convention: LOGICAL -> STREAM_PHYSICAL]
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15407) Add document to explain how to write a table with PK

2019-12-26 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-15407:


 Summary: Add document to explain how to write a table with PK
 Key: FLINK-15407
 URL: https://issues.apache.org/jira/browse/FLINK-15407
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table SQL / API
Reporter: Jingsong Lee
 Fix For: 1.10.0


I have had several user problems:

Why is an error reported when writing the upsertsink: TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.

Users are confused.

I think we can consider writing a document to describe it.

User need careful like:

 
{code:java}
insert into result_table select pk1, if(pk2 is null, '', pk2) as pk2, count(*), 
sum(f3) from source group by pk1, pk2; {code}
This will failed.

 
{code:java}
insert into result_table select pk1, pk2, count(*), sum(f1) from (select pk1, 
if(pk2 is null, '', pk2) as pk2, f1 from source) group by pk1, pk2; 
{code}
This can work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap

2019-12-26 Thread Darcy Lin (Jira)
Darcy Lin created FLINK-15406:
-

 Summary: The savepoint is writted by "State Processor API" can't 
be restore by map or flatmap
 Key: FLINK-15406
 URL: https://issues.apache.org/jira/browse/FLINK-15406
 Project: Flink
  Issue Type: Bug
  Components: API / State Processor
Affects Versions: 1.9.1
Reporter: Darcy Lin


The savepoint is writted by "State Processor API" can't be restore by map or 
flatmap. But it can be retored by KeyedProcessFunction.  
Following is the error message:

java.lang.Exception: Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream.java.lang.Exception: 
Could not write timer service of Flat Map -> Map -> Sink: 
device_first_user_create (1/8) to checkpoint state stream. at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
 at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)Caused by: java.lang.NullPointerException 
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at 
org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52)
 at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
 at 
org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
 at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
 ... 19 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15405) Make Yarn staging directory configurable

2019-12-26 Thread Yang Wang (Jira)
Yang Wang created FLINK-15405:
-

 Summary: Make Yarn staging directory configurable
 Key: FLINK-15405
 URL: https://issues.apache.org/jira/browse/FLINK-15405
 Project: Flink
  Issue Type: New Feature
  Components: Deployment / YARN
Reporter: Yang Wang


Currently, when deploying Flink cluster on yarn, we always use home dir as the 
prefix of staging directory. It is not convenient when multiple yarn cluster 
are using the same HDFS cluster. It will be not easy to cleanup the residual 
application staging directories. So it will be better if we could specify a 
user-defined directory as staging prefix. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15404) How to insert hive table for different catalog

2019-12-26 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-15404:
-

 Summary: How to insert hive table for  different catalog
 Key: FLINK-15404
 URL: https://issues.apache.org/jira/browse/FLINK-15404
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / Planner
Reporter: hehuiyuan


I have a hive catalog :

 
{code:java}
    catalog name : myhive 
    database : default
{code}
 

and  the flink has a default catalog :     

 
{code:java}
    catalog name : default_catalog
    database : default_database
{code}
 

For example :

I have a source table 'source_table' that's from kafka   which is register to  
default_catalog,

I want to insert hive table 'hive_table' that is from myhive catalog.

SQL:

insert into hive_table select * from source_table;

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15403) 'State Migration end-to-end test from 1.6' is unstable on travis.

2019-12-26 Thread Xintong Song (Jira)
Xintong Song created FLINK-15403:


 Summary: 'State Migration end-to-end test from 1.6' is unstable on 
travis.
 Key: FLINK-15403
 URL: https://issues.apache.org/jira/browse/FLINK-15403
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.10.0
Reporter: Xintong Song
 Fix For: 1.10.0


https://api.travis-ci.org/v3/job/629576631/log.txt

The test case fails because the log contains the following error message.
{code}
2019-12-26 09:19:35,537 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Received 
CancelTaskException while we are not canceled. This is a bug and should be 
reported
org.apache.flink.runtime.execution.CancelTaskException: Consumed partition 
PipelinedSubpartitionView(index: 0) of ResultPartition 
3886657fb8cc980139fac67e32d6e380@8cfcbe851fe3bb3fa00e9afc370bd963 has been 
released.
at 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:190)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:509)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:487)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:475)
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:75)
at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:125)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
at java.lang.Thread.run(Thread.java:748)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15402) Add disable quote character feature for serializing row to csv format

2019-12-26 Thread Peng (Jira)
Peng created FLINK-15402:


 Summary: Add disable quote character feature for serializing row 
to csv format
 Key: FLINK-15402
 URL: https://issues.apache.org/jira/browse/FLINK-15402
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.9.1
Reporter: Peng


We'd like to send row to kafka topic with csv format and that without any quote 
character. 
For example, input data is like Row.of("T est", 12, "Hello") and expect 
serialized result is {color:#0747a6}T est,12,Hello{color} rather than 
{color:#de350b}"T est",12,Hello{color} by default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15401) Planner should set sink field names to setKeyFields of upsert sink

2019-12-26 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-15401:


 Summary: Planner should set sink field names to setKeyFields of 
upsert sink
 Key: FLINK-15401
 URL: https://issues.apache.org/jira/browse/FLINK-15401
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Jingsong Lee
 Fix For: 1.10.0


There is a upsert sink my_sink with schema (a int, cnt int).

SQL:
{code:java}
INSERT INTO my_sink SELECT a, count(*) from my_source group by a;{code}
will work.

But:
{code:java}
INSERT INTO my_sink SELECT b, count(*) from my_source group by b;
{code}
will fail. Because the name "b" not match "a".

Now, we just pass field names from plan to setKeyFields. We need pass field 
names from sink.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15400) elasticsearch sink support dynamic index.

2019-12-26 Thread ouyangwulin (Jira)
ouyangwulin created FLINK-15400:
---

 Summary: elasticsearch sink support dynamic index.
 Key: FLINK-15400
 URL: https://issues.apache.org/jira/browse/FLINK-15400
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / ElasticSearch
Affects Versions: 1.9.1, 1.9.0, 1.11.0
Reporter: ouyangwulin
 Fix For: 1.11.0


>From 
>user...@flink.apache.org([https://lists.apache.org/thread.html/ac4e0c068baeb3b070f0213a2e1314e6552b226b8132a4c49d667ecd%40%3Cuser-zh.flink.apache.org%3E]),
>  Becuase the es 6/7 not support ttl. so User need clean the index by 
>timestamp. Add dynamic index is a useful function.  Add with properties 
>'dynamicIndex' as a switch for open dynamicIndex. Add with  properties 
>'indexField'  for the extract time field, Add properties 'indexInterval' for 
>change cycle mode.

 
||With property||discribe||default||Required||
|dynamicIndex|Dynamic or not|false(true/false)|false|
|indexField|extract index field| none|dynamicIndex is true , then indexField is 
required,only supported type "timestamp","date","long" |
|indexInterval|mode for  cycle|d|ddynamicIndex is true , this field is required 
,可选参数值如下:
d:day
m:mouth
w:week|

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15399) Join with a LookupableTableSource:java.lang.RuntimeException: while converting XXXX Caused by: java.lang.AssertionError: Field ordinal 26 is invalid for type

2019-12-26 Thread Rockey Cui (Jira)
Rockey Cui created FLINK-15399:
--

 Summary: Join with a 
LookupableTableSource:java.lang.RuntimeException: while converting   Caused 
by: java.lang.AssertionError: Field ordinal 26 is invalid for  type
 Key: FLINK-15399
 URL: https://issues.apache.org/jira/browse/FLINK-15399
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.9.1
 Environment: jdk1.8.0_211
Reporter: Rockey Cui
 Attachments: JoinTest-1.0-SNAPSHOT.jar

 
{code:java}
//代码占位符
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
env.setParallelism(1);

DataStreamSource stringDataStreamSource1 = env.fromElements(
"HA"
);
String[] fields1 = new String[]{"ORD_ID", "PS_PARTKEY", "PS_SUPPKEY", 
"PS_AVAILQTY", "PS_SUPPLYCOST", "PS_COMMENT"
// key
, "PS_INT", "PS_LONG"
, "PS_DOUBLE8", "PS_DOUBLE14", "PS_DOUBLE15"
, "PS_NUMBER1", "PS_NUMBER2", "PS_NUMBER3", "PS_NUMBER4"
, "PS_DATE", "PS_TIMESTAMP", "PS_DATE_EVENT", "PS_TIMESTAMP_EVENT"};
TypeInformation[] types1 = new TypeInformation[]{Types.STRING, 
Types.INT, Types.LONG, Types.LONG, Types.DOUBLE, Types.STRING
// key
, Types.INT, Types.LONG
, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE
, Types.LONG, Types.LONG, Types.DOUBLE, Types.DOUBLE
, Types.SQL_DATE, Types.SQL_TIMESTAMP, Types.SQL_DATE, 
Types.SQL_TIMESTAMP};
RowTypeInfo typeInformation1 = new RowTypeInfo(types1, fields1);
DataStream stream1 = stringDataStreamSource1.map(new 
MapFunction() {
private static final long serialVersionUID = 2349572544179673356L;

@Override
public Row map(String s) {
return new Row(typeInformation1.getArity());
}
}).returns(typeInformation1);
tableEnv.registerDataStream("FUN_1", stream1, String.join(",", 
typeInformation1.getFieldNames()) + ",PROCTIME.proctime");

DataStreamSource stringDataStreamSource2 = env.fromElements(
"HA"
);
String[] fields2 = new String[]{"C_NAME", "C_ADDRESS", "C_NATIONKEY"
// key
, "C_INT", "C_LONG"
, "C_DOUBLE8", "C_DOUBLE14"
, "C_DATE_EVENT", "C_TIMESTAMP_EVENT"};
TypeInformation[] types2 = new TypeInformation[]{Types.STRING, 
Types.STRING, Types.LONG
// key
, Types.INT, Types.LONG
, Types.DOUBLE, Types.DOUBLE
, Types.SQL_DATE, Types.SQL_TIMESTAMP};
RowTypeInfo typeInformation2 = new RowTypeInfo(types2, fields2);

DataStream stream2 = stringDataStreamSource2.map(new 
MapFunction() {
private static final long serialVersionUID = 2349572544179673349L;

@Override
public Row map(String s) {
return new Row(typeInformation2.getArity());
}
}).returns(typeInformation2);
tableEnv.registerDataStream("FUN_2", stream2, String.join(",", 
typeInformation2.getFieldNames()) + ",PROCTIME.proctime");

MyLookupTableSource tableSource = MyLookupTableSource.newBuilder()
.withFieldNames(new String[]{
"S_NAME", "S_ADDRESS", "S_PHONE"
, "S_ACCTBAL", "S_COMMENT"
// key
, "S_INT", "S_LONG"
, "S_DOUBLE8", "S_DOUBLE14"
, "S_DOUBLE15", "S_DATE_EVENT", "S_TIMESTAMP_EVENT"})
.withFieldTypes(new TypeInformation[]{
Types.STRING, Types.STRING, Types.STRING
, Types.DOUBLE, Types.STRING
// key
, Types.INT, Types.LONG
, Types.DOUBLE, Types.DOUBLE
, Types.DOUBLE, Types.SQL_DATE, Types.SQL_TIMESTAMP})
.build();

tableEnv.registerTableSource("INFO", tableSource);

String sql = "SELECT LN(F.PS_INT),LOG(F2.C_INT,1)\n" +
"  FROM (SELECT *\n" +
"  FROM FUN_1 F1\n" +
"  JOIN INFO FOR SYSTEM_TIME AS OF F1.PROCTIME D1\n" +
" ON F1.PS_INT = D1.S_INT AND F1.PS_LONG - 570 = D1.S_LONG \n" +
") F\n" +
"JOIN FUN_2 F2 ON F.PS_INT = F2.C_INT AND F.PS_LONG - 150 = 
F2.C_LONG\n" +
" WHERE 1=1\n" +
" AND F.PS_INT BETWEEN 1000 AND 5000\n" +
" AND F.S_LONG < 2147792600\n" + // I find this cause the Exception
" AND F.PS_COMMENT LIKE '%FILY%'\n" +
" AND F2.C_INT IS NOT NULL\n" +
" AND LN(F.PS_INT)<8";

Table table = tableEnv.sqlQuery(sql);

DataStream