Running Flink on kerberized HDP 3.1 (minimal getting started)

2020-06-12 Thread Georg Heiler
Hi,

I try to run Flink on a kerberized HDP 3.1 instance and need some help
getting started.
https://stackoverflow.com/questions/62330689/execute-flink-1-10-on-a-hdp-3-1-cluster-to-access-hive-tables
describes how far I have gotten so far.

In the end, I want to be able to start task managers on YARN and interact
with HDFS and hive and Kafka as well.

Best,
Georg


回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-12 Thread Sun.Zhu
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 


Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[$5])
LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[PROCTIME()])
LogicalTableScan(table=[[myhive, my_db, user_behavior, source: 
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制




Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

2020-06-12 Thread Yun Gao
Hi Felipe,

   I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type 
locally and it seems to be able to startup normally.

   Could you also share your current executing code and the full stacktrace of 
the exception ?

Best,
 Yun

 [1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java
 --Original Mail --
Sender:Felipe Gutierrez 
Send Date:Fri Jun 12 23:11:28 2020
Recipients:user 
Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to 
the TaxiRide training example?
Hi,

I am using the flink training exercise TaxiRide [1] to execute a
stream count of events. On the cluster and on my local machine I am
receiving the message that joda.Time cannot be serialized "class
org.joda.time.LocalDateTime is not a valid POJO type". However it is
starting the job on the cluster, but not in my local machine. So I
searched in the internet and it is requested to register the jodaTime
class on the environment[2]. I did like this:

env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);

and I added the joda and avro dependency on the pom.xml:


joda-time
joda-time


org.apache.flink
flink-avro
${project.version}


I also tested using addDefaultKryoSerializer but I got the same error.
For some reason, it is still not working. Does anyone have some hint
of what could be happening?

Thanks! Felipe
[1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-12 Thread zhangjunbao
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 


Best,
Junbao Zhang

> 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
> 
> hi,all
> 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> ddl如下:
> |
> CREATETABLE user_behavior (
>user_id BIGINT,
>item_id BIGINT,
>category_id BIGINT,
>behavior STRING,
>ts TIMESTAMP(3),
>proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
>WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> 'connector.type' = 'kafka',  -- 使用 kafka connector
> 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'user_behavior',  -- kafka topic
> 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> 'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
> 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 
> 地址
> 'format.type' = 'json'-- 数据源格式为 json
> );
> |
> 在查询时select * from user_behavior;报错如下:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to preserve 
> datatypes:
> validated type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME 
> ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> converted type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME 
> ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
> rel:
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
> ts=[$4], proctime=[$5])
>  LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL 
> SECOND)])
>LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], 
> behavior=[$3], ts=[$4], proctime=[PROCTIME()])
>  LogicalTableScan(table=[[myhive, my_db, user_behavior, source: 
> [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> 
> 
> flink版本:1.10.1
> blink planner,streaming model
> 
> 
> Thx
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
> 



Re: The network memory min (64 mb) and max (1 gb) mismatch

2020-06-12 Thread Xintong Song
Yes, that is correct. 'taskmanager.memory.process.size' is the most
recommended.

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 10:59 PM Clay Teeter  wrote:

> Ok, this is great to know.  So in my case; I have a k8 pod that has a
> limit of 4Gb.  I should remove the -Xmx and add one of these -D parameters.
>
> * taskmanager.memory.flink.size
> * *taskmanager.memory.process.size.   <- Probably this one*
> * taskmanager.memory.task.heap.size and taskmanager.memory.managed.size
>
> So that i don't run into pod memory quotas
>
>
>
>
>
>
> On Fri, Jun 12, 2020 at 11:12 AM Xintong Song 
> wrote:
>
>> I would suggest not to set -Xmx.
>>
>> Flink will always calculate the JVM heap size from the configuration and
>> set a proper -Xmx.
>> If you manually set -Xmx that overwrites the one Flink calculated, it
>> might result in unpredictable behaviors.
>>
>>
>> Please refer to this document[1]. In short, you could leverage the
>> configuration option "taskmanager.memory.task.heap.size", and an additional
>> constant framework overhead will be added to this value for -Xmx.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#jvm-parameters
>>
>> On Fri, Jun 12, 2020 at 4:50 PM Clay Teeter 
>> wrote:
>>
>>> Thank you Xintong, while tracking down the existence of
>>> bash-java-utils.jar I found a bug in my CI scripts that incorrectly built
>>> the wrong version of flink.  I fixed this and then added a -Xmx value.
>>>
>>> env:
>>>   - name: FLINK_ENV_JAVA_OPTS
>>> value: "-Xmx{{ .Values.analytics.flink.taskManagerHeapSize
>>> }}"
>>>
>>>
>>> It's running perfectly now!
>>>
>>> Thank you again,
>>> Clay
>>>
>>>
>>> On Fri, Jun 12, 2020 at 5:13 AM Xintong Song 
>>> wrote:
>>>
 Hi Clay,

 Could you verify the "taskmanager.sh" used is the same script shipped
 with Flink-1.10.1? Or a custom script is used? Also, does the jar file
 "bash-java-utils.jar" exist in your Flink bin directory?

 In Flink 1.10, the memory configuration for a TaskManager works as
 follows.

- "taskmanager.sh" executes "bash-java-utils.jar" for the memory
calculations
- "bash-java-utils.jar" will read your "flink-conf.yaml" and all
the "-D" arguments, and calculate memory sizes accordingly
- "bash-java-utils.jar" will then return the memory calculation
results as two strings, for JVM parameter ("-Xmx", "-Xms", etc.) and
dynamic configurations ("-D") respectively
- At this step, all the detailed memory sizes should be determined
   - That means, even for memory sizes not configured by you, there
   should be an exact value generated in the returned dynamic 
 configuration
   - That also means, for memory components configured in ranges
   (e.g., network memory configured through a pair of [min, max]),
   a deterministic value should be decided and both min/max 
 configuration
   options should already been overwrite to that value
- "taskmanager.sh" starts the task manager JVM process with the
returned JVM parameters, and passes the dynamic configurations as 
 arguments
into the task manager process. These dynamic configurations will be 
 read by
Flink task manager so that memory will be managed accordingly.

 Flink task manager expects all the memory configurations are already
 set (thus network min/max should have the same value) before it's started.
 In your case, it seems such configurations are missing. Same for the cpu
 cores.

 Thank you~

 Xintong Song



 On Fri, Jun 12, 2020 at 12:58 AM Clay Teeter 
 wrote:

> Hi flink fans,
>
> I'm hoping for an easy solution.  I'm trying to upgrade my 9.3 cluster
> to flink 10.1, but i'm running into memory configuration errors.
>
> Such as:
> *Caused by:
> org.apache.flink.configuration.IllegalConfigurationException: The network
> memory min (64 mb) and max (1 gb) mismatch, the network memory has to be
> resolved and set to a fixed value before task executor starts*
>
> *Caused by:
> org.apache.flink.configuration.IllegalConfigurationException: The required
> configuration option Key: 'taskmanager.cpu.cores' , default: null 
> (fallback
> keys: []) is not set*
>
> I was able to fix a cascade of errors by explicitly setting these
> values:
>
> taskmanager.memory.managed.size: {{
> .Values.analytics.flink.taskManagerManagedSize }}
> taskmanager.memory.task.heap.size: {{
> .Values.analytics.flink.taskManagerHeapSize }}
> taskmanager.memory.jvm-metaspace.size: 500m
> taskmanager.cpu.cores: 4
>
> So, the documentation implies that flink will default many of these
> values, however my 101. cluster doesn't seem to be doing this.  9.3, 

Re: Flink Async IO operator tuning / micro-benchmarks

2020-06-12 Thread Arvid Heise
Hi Arti,

ouch 3M is pretty far off the current setting.

Flink aside, you need to use 100 machines at the very minimum with the
current approach (AsyncHTTP and your evaluated machine). That's probably a
point where I'd try other libraries first and most importantly I'd evaluate
different machines. 100 nodes clusters are not unheard of (netflix uses a
shared 10k node cluster for Flink), but it's already big for an application
cluster and I always assume that an application grows over time and
naturally needs more resources.

First I'd measure if you are CPU or network bound. Chances are there that
you are in a cloud setting and your internet connection is just not beefy
enough. You could also check if you could have a shortcut connection with
your third party endpoint (VPC peering).

If you are CPU bound, I'd also try to use non-java tools like wrk2[1] to
just measure how fast you can get on a single machine. I'd use that tool to
also check other machine types.

Just by briefly checking some HTTP benchmarks, I see that 3M/s seems to be
a really high number even for 2020. It feels like a more sophisticated
approach than brute forcing may save lots of money. Is it possible to cache
results? Is it possible to do batch requests (100 at a time)?

Indeed it feels as if Flink is not the perfect fit, but there is no other
technology that pops immediately into my mind that would be able to
naturally perform 3M/s http requests with minimal resource usage. It's just
a huge goal.

[1] https://github.com/giltene/wrk2

On Fri, Jun 12, 2020 at 4:20 PM Arti Pande  wrote:

> Hi Arvid,
>
> *Shared api client*: Actually in the flow of writing I missed to mention
> that we switched to a static shared instance of async http client for all 7
> subtasks of the AsyncIO. The number of threads therefore is not 140 (20 *
> 7) but just (16 + 8 or 16 = 24 or 32) which includes a static shared thread
> pool for the listener of the response Future to deserialize and process the
> response body before it is emitted out.
>
> *Single operator:* With parallelism level 1 we experimented with
> following things, but there was very high backpressure on the upstream
> operator and overall pipeline throughput was unacceptably low
>
> (a) async buffer capacity value - beyond 100 there was a drop in throughput
> (b) io thread-pool in async http client - reducing the default size
> reduced the throughput as well
>
> (c) thread pool size for the listener of response Future
>
> From a scaling out perspective we definitely need to scale this out to be
> able to support about three million records per second and as per the
> experiments and benchmarks done till now, it appears that we will need many
> higher-core machines in a larger cluster. The intention of posting this
> question here is to validate and find if anything similar has been done by
> someone on lower-core machines with success.
>
> Thanks & regards,
> Arti
>
>
> On Fri, Jun 12, 2020 at 7:00 PM Arvid Heise  wrote:
>
>> Hi Arti,
>>
>> Thank you very much for providing so much information.
>>
>> One additional test that you could do is to check how the pipeline
>> performs by mocking the actual HTTP request and directly return a static
>> response through Async IO. This would give you an exact number including
>> potential serialization costs. I often see users focusing on the
>> performance of a specific operator, while the bottleneck is the shuffle
>> step before or after that (you get a shuffle if you have keyby or change
>> the degree of parallelism).
>>
>> But I'm going forward by the assumption that this is indeed completely
>> the AsyncIO itself that is causing the loss of performance. So for me, it
>> looks like Flink is a factor of 2 to 3 slower than the native approach
>> (exact number is hard to give, as there are more steps involved in Flink).
>> My guess is that the sheer number of threads might cause too many context
>> switches and the drop of performance. Are you starting an AsyncHTTPClient
>> thread pool per subtask or do you have it shared (by using a static
>> variable initialized once in RichAsyncFunction#open)? It sounded like the
>> former, so I'd definitely recommend trying the latter approach.
>>
>> One additional thing that you could try is actually use a parallelism of
>> 1 on that AsyncIO and just tweak that one thread pool according to your
>> non-Flink test. But that's usually harder to scale out.
>>
>>
>> To your final question, if it's the right fit: In general, the strength
>> of most distributed stream processors is distributed computing. In your
>> case, it will probably always happen that you will hit the limit on one
>> machine sooner or later. Then, it's the hour of Flink to shine and actually
>> distribute the work among multiple workers.
>>
>> If you never plan to scale out, there are probably other frameworks that
>> are more suited (Akka HTTP would be a natural candidate, assuming you can
>> connect your source/sink directly).
>>
>> However, I'd 

Re: Restore from savepoint through Java API

2020-06-12 Thread David Anderson
You can study LocalStreamingFileSinkTest [1] for an example of how to
approach this. You can use the test harnesses [2], keeping in mind that

- initializeState is called during instance creation
- the provided context indicates if state is being restored from a snapshot
- snapshot is called when taking a checkpoint
- notifyOfCompletedCheckpoint is called when a checkpoint is complete

The outline of such a test might follow this pattern:

testHarness1.setup();
testHarness1.initializeState(initState);
testHarness1.open();

// setup state to checkpoint ...

// capture snapshot
snapshot = testHarness.snapshot(checkpointId, timestamp);

// process more data, the effects of which will be lost ...

// create a new test harness initialized with the state from the snapshot
testHarness2.setup();
testHarness2.initializeState(snapshot);
testHarness2.open();

// verify the state ...

David

[1]
https://github.com/apache/flink/blob/release-1.10/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators

On Thu, Jun 11, 2020 at 12:12 PM Abhishek Rai  wrote:

> Hello,
>
> I'm writing a test for my custom sink function.  The function is stateful
> and relies on checkpoint restores for maintaining consistency with the
> external system that it's writing to.  For integration testing of the sink
> function, I have a MiniCluster based environment inside a single JVM
> through which I create my job and validate its operation.
>
> In order to test the checkpoint restore behavior with precision, I've
> disabled checkpointing and am instead using savepoints.  So, my test
> proceeds as follows:
>
> 1. Start a job.
> 2. Push some data through it to the sink and to an external system.
> 3. Trigger a savepoint.
> 4. Push more data.
> 5. Cancel the job.
> 6. Restore from the savepoint captured in step 3 above.
>
> I can't seem to find a Java API for restoring a job from a savepoint.  The
> approach in the documentation and other resources is to use the CLI, which
> is not an option for me.  Currently, I create a RemoteStreamEnvironment
> with savepointRestoreSettings set, but when I run execute(), I get the
> following error:
>
> java.lang.IllegalStateException: No operators defined in streaming
> topology. Cannot execute.
>
> var savepointDir =
> restClusterClient_.triggerSavepoint(jobId, tmpdir).get();
> assertTrue(!savepointDir.isBlank());
> // Cancel the job and launch a new one from the save point.
> restClusterClient_.cancel(jobId).get();
> var restoreSettings = SavepointRestoreSettings.forPath(savepointDir);
> var env = new RemoteStreamEnvironment(
> flinkMiniCluster_.host(),
> flinkMiniCluster_.port(),
> null,
> new String[] {},
> null,
> restoreSettings);
> var restoredJob = env.executeAsync();
>
>
> Separately, is there a flink testing utility I could use for integration
> testing of state checkpointing and recovery?
>
> Thanks,
> Abhishek
>


sqlclient集成hiveCatalog查询kafka表问题

2020-06-12 Thread Sun.Zhu
hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[$5])
  LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL 
SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[PROCTIME()])
  LogicalTableScan(table=[[myhive, my_db, user_behavior, source: 
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制



How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

2020-06-12 Thread Felipe Gutierrez
Hi,

I am using the flink training exercise TaxiRide [1] to execute a
stream count of events. On the cluster and on my local machine I am
receiving the message that joda.Time cannot be serialized "class
org.joda.time.LocalDateTime is not a valid POJO type". However it is
starting the job on the cluster, but not in my local machine. So I
searched in the internet and it is requested to register the jodaTime
class on the environment[2]. I did like this:

env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);

and I added the joda and avro dependency on the pom.xml:


joda-time
joda-time


org.apache.flink
flink-avro
${project.version}


I also tested using addDefaultKryoSerializer but I got the same error.
For some reason, it is still not working. Does anyone have some hint
of what could be happening?

Thanks! Felipe
[1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Re: flink sql字段重命名问题

2020-06-12 Thread naisili Yuan
感谢

发自我的iPhone

> 在 2020年6月12日,21:08,godfrey he  写道:
> 
> hi,如 benchao 所说,SELECT XX AS YY 和Table API的renameColumns等价。
> 而且这些名字仅用于sql解析和优化阶段,实际执行的时候不会使用字段名。
> 
> Best,
> Godfrey
> 
> Benchao Li  于2020年6月12日周五 下午6:36写道:
> 
>> 直接用SELECT XX AS YY就等价于Table API的renameColumns了吧。
>> 
>> naisili Yuan  于2020年6月12日周五 下午6:23写道:
>> 
>>> Hi all
>>>想请教下,flink使用sql的时候,字段重命名如何解决呢?看到了table api中的renameColumes接口,flink
>>> sql的没有看到明确的接口
>>>我自己试了一下在创建表的sql语句中就加入name_alias  AS
>>> name,发现可行,但是这样引入的字段,会不会影响效率,希望能获得解答,谢谢!
>>> 
>> 


Re: The network memory min (64 mb) and max (1 gb) mismatch

2020-06-12 Thread Clay Teeter
Ok, this is great to know.  So in my case; I have a k8 pod that has a limit
of 4Gb.  I should remove the -Xmx and add one of these -D parameters.

* taskmanager.memory.flink.size
* *taskmanager.memory.process.size.   <- Probably this one*
* taskmanager.memory.task.heap.size and taskmanager.memory.managed.size

So that i don't run into pod memory quotas






On Fri, Jun 12, 2020 at 11:12 AM Xintong Song  wrote:

> I would suggest not to set -Xmx.
>
> Flink will always calculate the JVM heap size from the configuration and
> set a proper -Xmx.
> If you manually set -Xmx that overwrites the one Flink calculated, it
> might result in unpredictable behaviors.
>
>
> Please refer to this document[1]. In short, you could leverage the
> configuration option "taskmanager.memory.task.heap.size", and an additional
> constant framework overhead will be added to this value for -Xmx.
>
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#jvm-parameters
>
> On Fri, Jun 12, 2020 at 4:50 PM Clay Teeter 
> wrote:
>
>> Thank you Xintong, while tracking down the existence of
>> bash-java-utils.jar I found a bug in my CI scripts that incorrectly built
>> the wrong version of flink.  I fixed this and then added a -Xmx value.
>>
>> env:
>>   - name: FLINK_ENV_JAVA_OPTS
>> value: "-Xmx{{ .Values.analytics.flink.taskManagerHeapSize }}"
>>
>>
>> It's running perfectly now!
>>
>> Thank you again,
>> Clay
>>
>>
>> On Fri, Jun 12, 2020 at 5:13 AM Xintong Song 
>> wrote:
>>
>>> Hi Clay,
>>>
>>> Could you verify the "taskmanager.sh" used is the same script shipped
>>> with Flink-1.10.1? Or a custom script is used? Also, does the jar file
>>> "bash-java-utils.jar" exist in your Flink bin directory?
>>>
>>> In Flink 1.10, the memory configuration for a TaskManager works as
>>> follows.
>>>
>>>- "taskmanager.sh" executes "bash-java-utils.jar" for the memory
>>>calculations
>>>- "bash-java-utils.jar" will read your "flink-conf.yaml" and all the
>>>"-D" arguments, and calculate memory sizes accordingly
>>>- "bash-java-utils.jar" will then return the memory calculation
>>>results as two strings, for JVM parameter ("-Xmx", "-Xms", etc.) and
>>>dynamic configurations ("-D") respectively
>>>- At this step, all the detailed memory sizes should be determined
>>>   - That means, even for memory sizes not configured by you, there
>>>   should be an exact value generated in the returned dynamic 
>>> configuration
>>>   - That also means, for memory components configured in ranges
>>>   (e.g., network memory configured through a pair of [min, max]),
>>>   a deterministic value should be decided and both min/max configuration
>>>   options should already been overwrite to that value
>>>- "taskmanager.sh" starts the task manager JVM process with the
>>>returned JVM parameters, and passes the dynamic configurations as 
>>> arguments
>>>into the task manager process. These dynamic configurations will be read 
>>> by
>>>Flink task manager so that memory will be managed accordingly.
>>>
>>> Flink task manager expects all the memory configurations are already set
>>> (thus network min/max should have the same value) before it's started. In
>>> your case, it seems such configurations are missing. Same for the cpu cores.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Fri, Jun 12, 2020 at 12:58 AM Clay Teeter 
>>> wrote:
>>>
 Hi flink fans,

 I'm hoping for an easy solution.  I'm trying to upgrade my 9.3 cluster
 to flink 10.1, but i'm running into memory configuration errors.

 Such as:
 *Caused by:
 org.apache.flink.configuration.IllegalConfigurationException: The network
 memory min (64 mb) and max (1 gb) mismatch, the network memory has to be
 resolved and set to a fixed value before task executor starts*

 *Caused by:
 org.apache.flink.configuration.IllegalConfigurationException: The required
 configuration option Key: 'taskmanager.cpu.cores' , default: null (fallback
 keys: []) is not set*

 I was able to fix a cascade of errors by explicitly setting these
 values:

 taskmanager.memory.managed.size: {{
 .Values.analytics.flink.taskManagerManagedSize }}
 taskmanager.memory.task.heap.size: {{
 .Values.analytics.flink.taskManagerHeapSize }}
 taskmanager.memory.jvm-metaspace.size: 500m
 taskmanager.cpu.cores: 4

 So, the documentation implies that flink will default many of these
 values, however my 101. cluster doesn't seem to be doing this.  9.3, worked
 great!

 Do I really have to set all the memory (even network) values?  If not,
 what am I missing?

 If i do have to set all the memory parameters, how do I resolve "The
 network memory min (64 mb) and max (1 gb) mismatch"?


 My cluster runs standalone jobs on kube

 

Re: Flink Async IO operator tuning / micro-benchmarks

2020-06-12 Thread Arti Pande
Hi Arvid,

*Shared api client*: Actually in the flow of writing I missed to mention
that we switched to a static shared instance of async http client for all 7
subtasks of the AsyncIO. The number of threads therefore is not 140 (20 *
7) but just (16 + 8 or 16 = 24 or 32) which includes a static shared thread
pool for the listener of the response Future to deserialize and process the
response body before it is emitted out.

*Single operator:* With parallelism level 1 we experimented with following
things, but there was very high backpressure on the upstream operator and
overall pipeline throughput was unacceptably low

(a) async buffer capacity value - beyond 100 there was a drop in throughput
(b) io thread-pool in async http client - reducing the default size reduced
the throughput as well

(c) thread pool size for the listener of response Future

>From a scaling out perspective we definitely need to scale this out to be
able to support about three million records per second and as per the
experiments and benchmarks done till now, it appears that we will need many
higher-core machines in a larger cluster. The intention of posting this
question here is to validate and find if anything similar has been done by
someone on lower-core machines with success.

Thanks & regards,
Arti


On Fri, Jun 12, 2020 at 7:00 PM Arvid Heise  wrote:

> Hi Arti,
>
> Thank you very much for providing so much information.
>
> One additional test that you could do is to check how the pipeline
> performs by mocking the actual HTTP request and directly return a static
> response through Async IO. This would give you an exact number including
> potential serialization costs. I often see users focusing on the
> performance of a specific operator, while the bottleneck is the shuffle
> step before or after that (you get a shuffle if you have keyby or change
> the degree of parallelism).
>
> But I'm going forward by the assumption that this is indeed completely the
> AsyncIO itself that is causing the loss of performance. So for me, it looks
> like Flink is a factor of 2 to 3 slower than the native approach (exact
> number is hard to give, as there are more steps involved in Flink). My
> guess is that the sheer number of threads might cause too many context
> switches and the drop of performance. Are you starting an AsyncHTTPClient
> thread pool per subtask or do you have it shared (by using a static
> variable initialized once in RichAsyncFunction#open)? It sounded like the
> former, so I'd definitely recommend trying the latter approach.
>
> One additional thing that you could try is actually use a parallelism of 1
> on that AsyncIO and just tweak that one thread pool according to your
> non-Flink test. But that's usually harder to scale out.
>
>
> To your final question, if it's the right fit: In general, the strength of
> most distributed stream processors is distributed computing. In your case,
> it will probably always happen that you will hit the limit on one machine
> sooner or later. Then, it's the hour of Flink to shine and actually
> distribute the work among multiple workers.
>
> If you never plan to scale out, there are probably other frameworks that
> are more suited (Akka HTTP would be a natural candidate, assuming you can
> connect your source/sink directly).
>
> However, I'd probably rather encourage you to consider scaling out as a
> natural component in your architecture. Data volume doubles roughly every
> 18 months, so unless you buy some very beefy machine, you will hit the
> limit sooner or later. From your description it sounds to me as if you kind
> of envision a throughput of at least 100K rec/s. Given that even in your
> stand-alone test with no additional transformations, you need 3-5 of your 8
> core machines to just perform the HTTP requests. So most likely you need a
> machine with more than 32 cores and that's the point where they get quickly
> expensive without offering you any fault tolerance. On the other hand, if
> you have a cluster of many smaller machines, you get a much more reliable
> environment that is overall cheaper.
>
> We unfortunately still need more time to fully incorporate dynamic
> scaling-in and out (ETA Flink 1.12 with Flink 1.11 currently being
> finalized), then you would be able to react on slower traffic (during
> night?) and peaks (noon, start/end of months) and get a very cost-efficient
> system.
>
> On Fri, Jun 12, 2020 at 10:59 AM Arti Pande  wrote:
>
>> Hi Arvid,
>>
>> Thanks for quick reply and totally agree with you on the differences
>> between microbenchmarks and a full benchmark with specific use-case. Thanks
>> for sending the microbenchmark screenshot.
>>
>> For our use-case, the streaming pipeline has five main transformations
>> that have business logic, of which Async IO to external API endpoint is one
>> operator. To create benchmarks for operators, I run the real pipeline with
>> full load on a single machine and note the Throughput and latency. Then add
>> each 

Re: Flink Async IO operator tuning / micro-benchmarks

2020-06-12 Thread Arvid Heise
Hi Arti,

Thank you very much for providing so much information.

One additional test that you could do is to check how the pipeline performs
by mocking the actual HTTP request and directly return a static response
through Async IO. This would give you an exact number including potential
serialization costs. I often see users focusing on the performance of a
specific operator, while the bottleneck is the shuffle step before or after
that (you get a shuffle if you have keyby or change the degree of
parallelism).

But I'm going forward by the assumption that this is indeed completely the
AsyncIO itself that is causing the loss of performance. So for me, it looks
like Flink is a factor of 2 to 3 slower than the native approach (exact
number is hard to give, as there are more steps involved in Flink). My
guess is that the sheer number of threads might cause too many context
switches and the drop of performance. Are you starting an AsyncHTTPClient
thread pool per subtask or do you have it shared (by using a static
variable initialized once in RichAsyncFunction#open)? It sounded like the
former, so I'd definitely recommend trying the latter approach.

One additional thing that you could try is actually use a parallelism of 1
on that AsyncIO and just tweak that one thread pool according to your
non-Flink test. But that's usually harder to scale out.


To your final question, if it's the right fit: In general, the strength of
most distributed stream processors is distributed computing. In your case,
it will probably always happen that you will hit the limit on one machine
sooner or later. Then, it's the hour of Flink to shine and actually
distribute the work among multiple workers.

If you never plan to scale out, there are probably other frameworks that
are more suited (Akka HTTP would be a natural candidate, assuming you can
connect your source/sink directly).

However, I'd probably rather encourage you to consider scaling out as a
natural component in your architecture. Data volume doubles roughly every
18 months, so unless you buy some very beefy machine, you will hit the
limit sooner or later. From your description it sounds to me as if you kind
of envision a throughput of at least 100K rec/s. Given that even in your
stand-alone test with no additional transformations, you need 3-5 of your 8
core machines to just perform the HTTP requests. So most likely you need a
machine with more than 32 cores and that's the point where they get quickly
expensive without offering you any fault tolerance. On the other hand, if
you have a cluster of many smaller machines, you get a much more reliable
environment that is overall cheaper.

We unfortunately still need more time to fully incorporate dynamic
scaling-in and out (ETA Flink 1.12 with Flink 1.11 currently being
finalized), then you would be able to react on slower traffic (during
night?) and peaks (noon, start/end of months) and get a very cost-efficient
system.

On Fri, Jun 12, 2020 at 10:59 AM Arti Pande  wrote:

> Hi Arvid,
>
> Thanks for quick reply and totally agree with you on the differences
> between microbenchmarks and a full benchmark with specific use-case. Thanks
> for sending the microbenchmark screenshot.
>
> For our use-case, the streaming pipeline has five main transformations
> that have business logic, of which Async IO to external API endpoint is one
> operator. To create benchmarks for operators, I run the real pipeline with
> full load on a single machine and note the Throughput and latency. Then add
> each operator one by one; always keeping the other basic operators like
> source, watermark generator, deserializer, sink etc turned on. The
> intention is to build a sort of incremental realistic benchmark for each
> operator for given use-case. Adding the AsyncIO operator (with parallelism
> 7 and async buffer capacity 100) with AsyncHTTPClient library brings
> throughput down from 102 K to a very low number i.e. 10K or 12 K
> records/sec.
>
> As you suggested, we tested the library being used (AsyncHTTPClient)
> independently of Flink, in a similar way to what AsyncIO does. A
> simple java program that invokes millions of API calls in a loop, with
> hard-coded POST request values, and limited (configurable) number of
> concurrent (maxInFlight) requests.  AsyncHTTPClient library by default uses
> nCores * 2 (= 16) IO threads, plus a fixed set of threads (say 2 or 4) for
> the ExecutorService to be passed to the Listener of result Future. So with
> this library the code requires at least 18 or 20 threads. Varying the
> maxInFlightRequests from 100 to 3000 the throughput varied from 17 K to 34
> K records/sec. Ofcourse this was with hard-coded POST request values and
> discarding the response body on reading (no further processing on it).
>
> When we tried to vary the async buffer capacity of AsyncIO (equivalent of
> maxInFlightRequests above) beyond 100, our throughput dropped further by
> 20% to 30%. Whereas in the test program above we would 

Re: Automatically resuming failed jobs in K8s

2020-06-12 Thread Averell
Thank you very much, Yang.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink sql字段重命名问题

2020-06-12 Thread godfrey he
hi,如 benchao 所说,SELECT XX AS YY 和Table API的renameColumns等价。
而且这些名字仅用于sql解析和优化阶段,实际执行的时候不会使用字段名。

Best,
Godfrey

Benchao Li  于2020年6月12日周五 下午6:36写道:

> 直接用SELECT XX AS YY就等价于Table API的renameColumns了吧。
>
> naisili Yuan  于2020年6月12日周五 下午6:23写道:
>
> > Hi all
> > 想请教下,flink使用sql的时候,字段重命名如何解决呢?看到了table api中的renameColumes接口,flink
> > sql的没有看到明确的接口
> > 我自己试了一下在创建表的sql语句中就加入name_alias  AS
> > name,发现可行,但是这样引入的字段,会不会影响效率,希望能获得解答,谢谢!
> >
>


Re: 关于DataStreamUtils.reinterpretasKeyedStream的使用

2020-06-12 Thread Congxian Qiu
Hi

DataStreamUtils.reinterpretasKeyedStream 会返回一个 KeyedStream,但是在 KeyedStream
上再进行 flatmap 之后就变成 DataStream 了,可以按照 Jark 说的再次使用 reinterpretAsKeyedStream
得到 KeyedStream。
另外注意在 1.8 之前的版本中使用这个功能有可能会丢数据,具体可以参考这个 issue[1]

[1] https://issues.apache.org/jira/browse/FLINK-12296

Best,
Congxian


Jark Wu  于2020年6月10日周三 下午10:29写道:

> Hi,
>
> 你可以再用 DataStreamUtils.reinterpretAsKeyedStream(proStream, new
> MyKeySelector2(groupKeys) ) 把它解释成 KeyedStream。
> 因为你的 flatmap 和 上游节点之间如果并发一样的话,运行时是会 chain 在一起的,所以key的分布没有变。
>
> Best,
> Jark
>
> On Wed, 10 Jun 2020 at 21:15, Yichao Yang <1048262...@qq.com> wrote:
>
> > Hi
> >
> >
> > flatmap之后返回的本身就不是Keyedstream哈,keyby之后返回的才是keyedstream。
> >
> >
> > Best,
> > Yichao Yang
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:"绘梦飘雪"<318666...@qq.com;
> > 发送时间:2020年6月10日(星期三) 晚上7:18
> > 收件人:"user-zh" >
> > 主题:关于DataStreamUtils.reinterpretasKeyedStream的使用
> >
> >
> >
> > hi
> > nbsp; 我有这样一个场景,以多个相同的key.做keyby,
> > DataStream resStream =nbsp; demoStream.keyBy(groupKeys)
> > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> > .flatMap(new MyFlatmapFunction())
> > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> > .keyBy(groupKeys)
> > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> > .process(new MyProcessFunction())
> > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> > .keyBy(groupKeys)
> > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> > .timeWindow(Time.seconds(1))
> > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> > .aggregate(new MyAggFunction())
> > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> > .keyBy(groupKeys)
> > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> > .timeWindow(Time.seconds(1))
> > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> > .process(new MyKeyProcessFunction());
> >
> > 我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,,
> > int[] groupKeys = new int[]{0,2,3};
> >
> > DataStream proStream =
> > DataStreamUtils.reinterpretAsKeyedStream(demoStream, new
> > MyKeySelector2(groupKeys) ) // MyKeySelector2
> >
> 自己实现keySelector
> > .flatMap(new MyFlatmapFunction())
> > 我这样写发现数据流经过flatmap后并不是返回一个keyedstream
> > ,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream
>


Re: flink sql字段重命名问题

2020-06-12 Thread Benchao Li
直接用SELECT XX AS YY就等价于Table API的renameColumns了吧。

naisili Yuan  于2020年6月12日周五 下午6:23写道:

> Hi all
> 想请教下,flink使用sql的时候,字段重命名如何解决呢?看到了table api中的renameColumes接口,flink
> sql的没有看到明确的接口
> 我自己试了一下在创建表的sql语句中就加入name_alias  AS
> name,发现可行,但是这样引入的字段,会不会影响效率,希望能获得解答,谢谢!
>


flink sql字段重命名问题

2020-06-12 Thread naisili Yuan
Hi all
想请教下,flink使用sql的时候,字段重命名如何解决呢?看到了table api中的renameColumes接口,flink
sql的没有看到明确的接口
我自己试了一下在创建表的sql语句中就加入name_alias  AS
name,发现可行,但是这样引入的字段,会不会影响效率,希望能获得解答,谢谢!


Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-12 Thread Xintong Song
>
> Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e
> 29GB will be used.
>

This is true.

So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am
> assuming the heap max of 102Gb will be used in the N/w mem calculation.
> Is that the right way to set env.java.opts ??
>

I cannot be sure. I just checked, and it seems even for Mesos the "-Xmx"
should be set. So technically, Flink should have always set the "-Xmx". If
you are using a custom shell script for launching task manager processes,
then I cannot tell whether "env.java.opts" works for you.

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 5:33 PM Vijay Balakrishnan 
wrote:

> Hi Xintong,
> Just to be clear. I haven't set any -Xmx -i will check our scripts again.
> Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e
> 29GB will be used.
>
> So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am
> assuming the heap max of 102Gb will be used in the N/w mem calculation.
> Is that the right way to set env.java.opts ??
> TIA,
> Vijay
>
> On Fri, Jun 12, 2020 at 1:49 AM Xintong Song 
> wrote:
>
>> Flink should have calculated the heap size and set the -Xms, according to
>> the equations I mentioned. So if you haven't set an customized -Xmx that
>> overwrites this, it should not use the default 1.4 of physical memory.
>>>
>>>
>>>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1
>>>- 0.48) = 53 GB
>>>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio))
>>>* (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 
>>> 0.48) =
>>>40.6GB
>>>
>>>
>> Are you running Flink on Mesos? I think Flink has not automatically set
>> -Xmx on Mesos.
>>
>>
>> BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is
>> much closer to 29GB if we consider there are some rounding errors and
>> accuracy loss.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan 
>> wrote:
>>
>>> Thx, Xintong for a great answer. Much appreciated.
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap
>>>
>>>
>>> Max heap: if -Xmx is set then it is its value else ¼ of physical
>>> machine memory estimated by the JVM
>>>
>>> No -Xmx is set.So, 1/4 of 102GB  = 25.5GB but not sure about the 29GB
>>> figure.
>>>
>>> On Thu, Jun 11, 2020 at 9:14 PM Xintong Song 
>>> wrote:
>>>
 Hi Vijay,

 The memory configurations in Flink 1.9 and previous versions are indeed
 complicated and confusing. That is why we made significant changes to it in
 Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the
 upcoming Flink 1.11 which is very likely to be released in this month.

 Regarding your questions,

- "Physical Memory" displayed on the web ui stands for the total
memory on your machine. This information is retrieved from your OS. It 
 is
not related to the network memory calculation. It is displayed mainly 
 for
historical reasons.
- The error message means that you have about 26.8 GB network
memory (877118 * 32768 bytes), and your job is trying to use more.
- The "total memory" referred in network memory calculation is:
   - jvm-heap + network, if managed memory is configured on-heap
   (default)
  - According to your screenshot, the managed memory
  on-heap/off-heap configuration is not touched, so this should be 
 your case.
   - jvm-heap + managed + network, if managed memory is configured
   off-heap
- The network memory size is actually derived reversely. Flink
reads the max heap size from JVM (and the managed memory size from
configuration if it is configured off-heap), and derives the network 
 memory
size with the following equation.
   - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap /
   (1-networkFraction) * networkFraction))
   - In your case, networkMem = Min(50GB, Max(500MB, 29GB /
   (1-0.48) * 0.48)) = 26.8GB

 One thing I don't understand is, why do you only have 29GB heap size
 when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB).
 The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use
 "total" to represent "taskmanager.heap.size" for short. Also omitted the
 calculations when managed memory is configured off-heap.

- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1
- 0.48) = 53 GB
- On Yarn: jvmHeap = (total - Max(cutoff-min, total *
cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 
 0.25))
* (1 - 0.48) = 40.6GB

 Have you specified a custom "-Xmx" parameter?

 Thank you~

 Xintong Song



 On Fri, Jun 12, 2020 at 7:50 AM 

Re:回复: flink sql Temporal table join failed

2020-06-12 Thread Zhou Zach
好的

















在 2020-06-12 17:46:22,"咖啡泡油条" <9329...@qq.com> 写道:
>可以参考之前的邮件列表
>https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E
>
>
>
>
>--原始邮件--
>发件人:"Leonard Xu"发送时间:2020年6月12日(星期五) 下午5:43
>收件人:"user-zh"
>主题:Re: flink sql Temporal table join failed
>
>
>
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
> 在 2020年6月12日,17:38,Zhou Zach  
> 
> 
> 
> 是的,1.10.0版本
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-12 16:28:15,"Benchao Li"  看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
> 
> Zhou Zach  
> 还是不行,
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> 
>[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> 
>[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for 
>an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> ERROR StatusLogger No log4j2 configuration file found. Using 
>default
> configuration: logging only errors to the console.
> Exception in thread "main" 
>org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "time FROM" at line 1, column 44.
> Was expecting one of:
> "CURSOR" ...
> "EXISTS" ...
> "NOT" ...
> "ROW" ...
> "(" ...
> "+" ...
> "-" ...
>"TRUE" ...
> "FALSE" ...
> "UNKNOWN" ...
> "NULL" ...
>"DATE" ...
> "TIME"  "TIMESTAMP" ...
> "INTERVAL" ...
> "?" ...
> "CAST" ...
> "EXTRACT" ...
> "POSITION" ...
> "CONVERT" ...
> "TRANSLATE" ...
> "OVERLAY" ...
> "FLOOR" ...
> "CEIL" ...
> "CEILING" ...
> "SUBSTRING" ...
> "TRIM" ...
> "CLASSIFIER" ...
> "MATCH_NUMBER" ...
> "RUNNING" ...
> "PREV" ...
> "NEXT" ...
> "JSON_EXISTS" ...
> "JSON_VALUE" ...
> "JSON_QUERY" ...
> "JSON_OBJECT" ...
> "JSON_OBJECTAGG" ...
> "JSON_ARRAY" ...
> "JSON_ARRAYAGG" ...
>  "MULTISET" ...
> "ARRAY" ...
> "MAP" ...
> "PERIOD" ...
> "SPECIFIC" ...
>  "ABS" ...
> "AVG" ...
> "CARDINALITY" ...
> "CHAR_LENGTH" ...
> "CHARACTER_LENGTH" ...
> "COALESCE" ...
> "COLLECT" ...
> "COVAR_POP" ...
> "COVAR_SAMP" ...
> "CUME_DIST" ...
> "COUNT" ...
> "CURRENT_DATE" ...
> "CURRENT_TIME" ...
> "CURRENT_TIMESTAMP" ...
> "DENSE_RANK" ...
> "ELEMENT" ...
> "EXP" ...
> "FIRST_VALUE" ...
> "FUSION" ...
> "GROUPING" ...
> "HOUR" ...
> "LAG" ...
> "LEAD" ...
> "LEFT" ...
> "LAST_VALUE" ...
> "LN" ...
> "LOCALTIME" ...
> "LOCALTIMESTAMP" ...
> "LOWER" ...
> "MAX" ...
> "MIN" ...
> "MINUTE" ...
> "MOD" ...
> "MONTH" ...
> "NTH_VALUE" ...
> "NTILE" ...
> "NULLIF" ...
> "OCTET_LENGTH" ...
> "PERCENT_RANK" ...
> "POWER" ...
> "RANK" ...
> "REGR_COUNT" ...
> "REGR_SXX" ...
> "REGR_SYY" ...
> "RIGHT" ...
> "ROW_NUMBER" ...
> "SECOND" ...
> "SQRT" ...
> "STDDEV_POP" ...
> "STDDEV_SAMP" ...
> "SUM" ...
> "UPPER" ...
> "TRUNCATE" ...
> "USER" ...
> "VAR_POP" ...
> "VAR_SAMP" ...
> "YEAR" ...
> "CURRENT_CATALOG" ...
> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> "CURRENT_PATH" ...
> "CURRENT_ROLE" ...
> "CURRENT_SCHEMA" ...
> "CURRENT_USER" ...
> "SESSION_USER" ...
> "SYSTEM_USER" ...
> "NEW" ...
> "CASE" ...
> "CURRENT" ...
> 
> at
> 
>org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
> at
> 
>org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
> at
> 
>org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
> at
> 
>org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
> 
>org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
> 
>org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> 
>org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
> 
>org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> at
> 
>org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
> 
>org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
> 
>org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
> 
>org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> 
>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
> 
>org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
> 
>org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
> 

?????? flink sql Temporal table join failed

2020-06-12 Thread ??????????
??
https://lists.apache.org/thread.html/r951ca3dfa24598b2c90f9d2172d5228c4689b8a710d7dc119055c5d3%40%3Cuser-zh.flink.apache.org%3E




----
??:"Leonard Xu"http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type
 [org.apache.logging.slf4j.Log4jLoggerFactory]
 ERROR StatusLogger No log4j2 configuration file found. Using 
default
 configuration: logging only errors to the console.
 Exception in thread "main" 
org.apache.flink.table.api.SqlParserException:
 SQL parse failed. Encountered "time FROM" at line 1, column 44.
 Was expecting one of:
 "CURSOR" ...
 "EXISTS" ...
 "NOT" ...
 "ROW" ...
 "(" ...
 "+" ...
 "-" ...
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
 
 Zhou Zach http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 
 SLF4J: Actual binding is of type
 [org.apache.logging.slf4j.Log4jLoggerFactory]
 
 ERROR StatusLogger No log4j2 configuration file 
found. Using default
 configuration: logging only errors to the console.
 
 Exception in thread "main" 
org.apache.flink.table.api.TableException:
 Cannot generate a valid execution plan for the 
given query:
 
 
 
 
 
 
FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
 fields=[time, sum_age])
 
 +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, 
age])
 
 +- FlinkLogicalJoin(condition=[=($0, 
$2)], joinType=[inner])
 
 :- 
FlinkLogicalCalc(select=[uid, time])
 
 : +- 
FlinkLogicalTableSourceScan(table=[[default_catalog,
 default_database, user_behavior, source: 
[KafkaTableSource(uid,
 phoneType,
 clickCount, time)]]], fields=[uid, phoneType, 
clickCount, time])
 
 +- 
FlinkLogicalSnapshot(period=[$cor0.time])
 
 +- 
FlinkLogicalCalc(select=[uid, age])
 

 +- FlinkLogicalTableSourceScan(table=[[default_catalog,
 default_database, users, source: 
[MysqlAsyncLookupTableSource(uid,
 sex,
 age, created_time)]]], fields=[uid, sex, age, 
created_time])
 
 
 
 
 Temporal table join currently only supports 'FOR 
SYSTEM_TIME AS OF'
 left
 table's proctime field, doesn't support 
'PROCTIME()'
 
 Please check the documentation for the set of 
currently supported SQL
 features.
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
 
 at
 
 
 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 
 at
 
 
 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 
 at 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
 
 at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 
 at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 
 at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 
 at
 
 
 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
 
 at
 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
 
 at
 
 
 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 
 at
 
 
 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
 
 at
 
 
 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
 
 at
 
 
 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
 
 at
 
 
 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
 
 at
 
 
 
org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
 
 at
 
 
 
org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
 
 Caused by: 
org.apache.flink.table.api.TableException: Temporal table
 join
 currently only supports 'FOR SYSTEM_TIME AS OF' 
left table's proctime
 field, doesn't support 'PROCTIME()'
 
 at
 
 
 
org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
 
 at
 
 
 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
 
 at
 
 
 

Re:Re: flink sql Temporal table join failed

2020-06-12 Thread Zhou Zach
感谢提醒

















在 2020-06-12 17:43:20,"Leonard Xu"  写道:
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
>> 在 2020年6月12日,17:38,Zhou Zach  写道:
>> 
>> 
>> 
>> 
>> 是的,1.10.0版本
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-12 16:28:15,"Benchao Li"  写道:
>>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>>> 
>>> Zhou Zach  于2020年6月12日周五 下午3:47写道:
>>> 
 还是不行,
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in
 [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type
 [org.apache.logging.slf4j.Log4jLoggerFactory]
 ERROR StatusLogger No log4j2 configuration file found. Using default
 configuration: logging only errors to the console.
 Exception in thread "main" org.apache.flink.table.api.SqlParserException:
 SQL parse failed. Encountered "time FROM" at line 1, column 44.
 Was expecting one of:
"CURSOR" ...
"EXISTS" ...
"NOT" ...
"ROW" ...
"(" ...
"+" ...
"-" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"TRUE" ...
"FALSE" ...
"UNKNOWN" ...
"NULL" ...
 ...
 ...
 ...
"DATE" ...
"TIME"  ...
"TIMESTAMP" ...
"INTERVAL" ...
"?" ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"NEXT" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUERY" ...
"JSON_OBJECT" ...
"JSON_OBJECTAGG" ...
"JSON_ARRAY" ...
"JSON_ARRAYAGG" ...
 ...
"MULTISET" ...
"ARRAY" ...
"MAP" ...
"PERIOD" ...
"SPECIFIC" ...
 ...
 ...
 ...
 ...
 ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
"CURRENT_DATE" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"DENSE_RANK" ...
"ELEMENT" ...
"EXP" ...
"FIRST_VALUE" ...
"FUSION" ...
"GROUPING" ...
"HOUR" ...
"LAG" ...
"LEAD" ...
"LEFT" ...
"LAST_VALUE" ...
"LN" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"LOWER" ...
"MAX" ...
"MIN" ...
"MINUTE" ...
"MOD" ...
"MONTH" ...
"NTH_VALUE" ...
"NTILE" ...
"NULLIF" ...
"OCTET_LENGTH" ...
"PERCENT_RANK" ...
"POWER" ...
"RANK" ...
"REGR_COUNT" ...
"REGR_SXX" ...
"REGR_SYY" ...
"RIGHT" ...
"ROW_NUMBER" ...
"SECOND" ...
"SQRT" ...
"STDDEV_POP" ...
"STDDEV_SAMP" ...
"SUM" ...
"UPPER" ...
"TRUNCATE" ...
"USER" ...
"VAR_POP" ...
"VAR_SAMP" ...
"YEAR" ...
"CURRENT_CATALOG" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_USER" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"NEW" ...
"CASE" ...
"CURRENT" ...
 
 at
 org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
 at
 org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
 at
 org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
 at
 org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
 at
 

Re: flink sql Temporal table join failed

2020-06-12 Thread Leonard Xu


你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。

祝好
Leonard Xu

> 在 2020年6月12日,17:38,Zhou Zach  写道:
> 
> 
> 
> 
> 是的,1.10.0版本
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-12 16:28:15,"Benchao Li"  写道:
>> 看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>> 
>> Zhou Zach  于2020年6月12日周五 下午3:47写道:
>> 
>>> 还是不行,
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>>> Was expecting one of:
>>>"CURSOR" ...
>>>"EXISTS" ...
>>>"NOT" ...
>>>"ROW" ...
>>>"(" ...
>>>"+" ...
>>>"-" ...
>>> ...
>>> ...
>>> ...
>>> ...
>>> ...
>>> ...
>>> ...
>>>"TRUE" ...
>>>"FALSE" ...
>>>"UNKNOWN" ...
>>>"NULL" ...
>>> ...
>>> ...
>>> ...
>>>"DATE" ...
>>>"TIME"  ...
>>>"TIMESTAMP" ...
>>>"INTERVAL" ...
>>>"?" ...
>>>"CAST" ...
>>>"EXTRACT" ...
>>>"POSITION" ...
>>>"CONVERT" ...
>>>"TRANSLATE" ...
>>>"OVERLAY" ...
>>>"FLOOR" ...
>>>"CEIL" ...
>>>"CEILING" ...
>>>"SUBSTRING" ...
>>>"TRIM" ...
>>>"CLASSIFIER" ...
>>>"MATCH_NUMBER" ...
>>>"RUNNING" ...
>>>"PREV" ...
>>>"NEXT" ...
>>>"JSON_EXISTS" ...
>>>"JSON_VALUE" ...
>>>"JSON_QUERY" ...
>>>"JSON_OBJECT" ...
>>>"JSON_OBJECTAGG" ...
>>>"JSON_ARRAY" ...
>>>"JSON_ARRAYAGG" ...
>>> ...
>>>"MULTISET" ...
>>>"ARRAY" ...
>>>"MAP" ...
>>>"PERIOD" ...
>>>"SPECIFIC" ...
>>> ...
>>> ...
>>> ...
>>> ...
>>> ...
>>>"ABS" ...
>>>"AVG" ...
>>>"CARDINALITY" ...
>>>"CHAR_LENGTH" ...
>>>"CHARACTER_LENGTH" ...
>>>"COALESCE" ...
>>>"COLLECT" ...
>>>"COVAR_POP" ...
>>>"COVAR_SAMP" ...
>>>"CUME_DIST" ...
>>>"COUNT" ...
>>>"CURRENT_DATE" ...
>>>"CURRENT_TIME" ...
>>>"CURRENT_TIMESTAMP" ...
>>>"DENSE_RANK" ...
>>>"ELEMENT" ...
>>>"EXP" ...
>>>"FIRST_VALUE" ...
>>>"FUSION" ...
>>>"GROUPING" ...
>>>"HOUR" ...
>>>"LAG" ...
>>>"LEAD" ...
>>>"LEFT" ...
>>>"LAST_VALUE" ...
>>>"LN" ...
>>>"LOCALTIME" ...
>>>"LOCALTIMESTAMP" ...
>>>"LOWER" ...
>>>"MAX" ...
>>>"MIN" ...
>>>"MINUTE" ...
>>>"MOD" ...
>>>"MONTH" ...
>>>"NTH_VALUE" ...
>>>"NTILE" ...
>>>"NULLIF" ...
>>>"OCTET_LENGTH" ...
>>>"PERCENT_RANK" ...
>>>"POWER" ...
>>>"RANK" ...
>>>"REGR_COUNT" ...
>>>"REGR_SXX" ...
>>>"REGR_SYY" ...
>>>"RIGHT" ...
>>>"ROW_NUMBER" ...
>>>"SECOND" ...
>>>"SQRT" ...
>>>"STDDEV_POP" ...
>>>"STDDEV_SAMP" ...
>>>"SUM" ...
>>>"UPPER" ...
>>>"TRUNCATE" ...
>>>"USER" ...
>>>"VAR_POP" ...
>>>"VAR_SAMP" ...
>>>"YEAR" ...
>>>"CURRENT_CATALOG" ...
>>>"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>>"CURRENT_PATH" ...
>>>"CURRENT_ROLE" ...
>>>"CURRENT_SCHEMA" ...
>>>"CURRENT_USER" ...
>>>"SESSION_USER" ...
>>>"SYSTEM_USER" ...
>>>"NEW" ...
>>>"CASE" ...
>>>"CURRENT" ...
>>> 
>>> at
>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>> at
>>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>> at
>>> 

Re:Re: Re: Re: flink sql Temporal table join failed

2020-06-12 Thread Zhou Zach



是的,1.10.0版本








在 2020-06-12 16:28:15,"Benchao Li"  写道:
>看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>
>Zhou Zach  于2020年6月12日周五 下午3:47写道:
>
>> 还是不行,
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> ERROR StatusLogger No log4j2 configuration file found. Using default
>> configuration: logging only errors to the console.
>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>> SQL parse failed. Encountered "time FROM" at line 1, column 44.
>> Was expecting one of:
>> "CURSOR" ...
>> "EXISTS" ...
>> "NOT" ...
>> "ROW" ...
>> "(" ...
>> "+" ...
>> "-" ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>> "TRUE" ...
>> "FALSE" ...
>> "UNKNOWN" ...
>> "NULL" ...
>>  ...
>>  ...
>>  ...
>> "DATE" ...
>> "TIME"  ...
>> "TIMESTAMP" ...
>> "INTERVAL" ...
>> "?" ...
>> "CAST" ...
>> "EXTRACT" ...
>> "POSITION" ...
>> "CONVERT" ...
>> "TRANSLATE" ...
>> "OVERLAY" ...
>> "FLOOR" ...
>> "CEIL" ...
>> "CEILING" ...
>> "SUBSTRING" ...
>> "TRIM" ...
>> "CLASSIFIER" ...
>> "MATCH_NUMBER" ...
>> "RUNNING" ...
>> "PREV" ...
>> "NEXT" ...
>> "JSON_EXISTS" ...
>> "JSON_VALUE" ...
>> "JSON_QUERY" ...
>> "JSON_OBJECT" ...
>> "JSON_OBJECTAGG" ...
>> "JSON_ARRAY" ...
>> "JSON_ARRAYAGG" ...
>>  ...
>> "MULTISET" ...
>> "ARRAY" ...
>> "MAP" ...
>> "PERIOD" ...
>> "SPECIFIC" ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>> "ABS" ...
>> "AVG" ...
>> "CARDINALITY" ...
>> "CHAR_LENGTH" ...
>> "CHARACTER_LENGTH" ...
>> "COALESCE" ...
>> "COLLECT" ...
>> "COVAR_POP" ...
>> "COVAR_SAMP" ...
>> "CUME_DIST" ...
>> "COUNT" ...
>> "CURRENT_DATE" ...
>> "CURRENT_TIME" ...
>> "CURRENT_TIMESTAMP" ...
>> "DENSE_RANK" ...
>> "ELEMENT" ...
>> "EXP" ...
>> "FIRST_VALUE" ...
>> "FUSION" ...
>> "GROUPING" ...
>> "HOUR" ...
>> "LAG" ...
>> "LEAD" ...
>> "LEFT" ...
>> "LAST_VALUE" ...
>> "LN" ...
>> "LOCALTIME" ...
>> "LOCALTIMESTAMP" ...
>> "LOWER" ...
>> "MAX" ...
>> "MIN" ...
>> "MINUTE" ...
>> "MOD" ...
>> "MONTH" ...
>> "NTH_VALUE" ...
>> "NTILE" ...
>> "NULLIF" ...
>> "OCTET_LENGTH" ...
>> "PERCENT_RANK" ...
>> "POWER" ...
>> "RANK" ...
>> "REGR_COUNT" ...
>> "REGR_SXX" ...
>> "REGR_SYY" ...
>> "RIGHT" ...
>> "ROW_NUMBER" ...
>> "SECOND" ...
>> "SQRT" ...
>> "STDDEV_POP" ...
>> "STDDEV_SAMP" ...
>> "SUM" ...
>> "UPPER" ...
>> "TRUNCATE" ...
>> "USER" ...
>> "VAR_POP" ...
>> "VAR_SAMP" ...
>> "YEAR" ...
>> "CURRENT_CATALOG" ...
>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>> "CURRENT_PATH" ...
>> "CURRENT_ROLE" ...
>> "CURRENT_SCHEMA" ...
>> "CURRENT_USER" ...
>> "SESSION_USER" ...
>> "SYSTEM_USER" ...
>> "NEW" ...
>> "CASE" ...
>> "CURRENT" ...
>>
>> at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>> at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>> at

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-12 Thread Vijay Balakrishnan
Hi Xintong,
Just to be clear. I haven't set any -Xmx -i will check our scripts again.
Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e 29GB
will be used.

So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am
assuming the heap max of 102Gb will be used in the N/w mem calculation.
Is that the right way to set env.java.opts ??
TIA,
Vijay

On Fri, Jun 12, 2020 at 1:49 AM Xintong Song  wrote:

> Flink should have calculated the heap size and set the -Xms, according to
> the equations I mentioned. So if you haven't set an customized -Xmx that
> overwrites this, it should not use the default 1.4 of physical memory.
>>
>>
>>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 -
>>0.48) = 53 GB
>>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio))
>>* (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) 
>> =
>>40.6GB
>>
>>
> Are you running Flink on Mesos? I think Flink has not automatically set
> -Xmx on Mesos.
>
>
> BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is
> much closer to 29GB if we consider there are some rounding errors and
> accuracy loss.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan 
> wrote:
>
>> Thx, Xintong for a great answer. Much appreciated.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap
>>
>>
>> Max heap: if -Xmx is set then it is its value else ¼ of physical machine
>> memory estimated by the JVM
>>
>> No -Xmx is set.So, 1/4 of 102GB  = 25.5GB but not sure about the 29GB
>> figure.
>>
>> On Thu, Jun 11, 2020 at 9:14 PM Xintong Song 
>> wrote:
>>
>>> Hi Vijay,
>>>
>>> The memory configurations in Flink 1.9 and previous versions are indeed
>>> complicated and confusing. That is why we made significant changes to it in
>>> Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the
>>> upcoming Flink 1.11 which is very likely to be released in this month.
>>>
>>> Regarding your questions,
>>>
>>>- "Physical Memory" displayed on the web ui stands for the total
>>>memory on your machine. This information is retrieved from your OS. It is
>>>not related to the network memory calculation. It is displayed mainly for
>>>historical reasons.
>>>- The error message means that you have about 26.8 GB network memory
>>>(877118 * 32768 bytes), and your job is trying to use more.
>>>- The "total memory" referred in network memory calculation is:
>>>   - jvm-heap + network, if managed memory is configured on-heap
>>>   (default)
>>>  - According to your screenshot, the managed memory
>>>  on-heap/off-heap configuration is not touched, so this should be 
>>> your case.
>>>   - jvm-heap + managed + network, if managed memory is configured
>>>   off-heap
>>>- The network memory size is actually derived reversely. Flink reads
>>>the max heap size from JVM (and the managed memory size from 
>>> configuration
>>>if it is configured off-heap), and derives the network memory size with 
>>> the
>>>following equation.
>>>   - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap /
>>>   (1-networkFraction) * networkFraction))
>>>   - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48)
>>>   * 0.48)) = 26.8GB
>>>
>>> One thing I don't understand is, why do you only have 29GB heap size
>>> when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB).
>>> The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use
>>> "total" to represent "taskmanager.heap.size" for short. Also omitted the
>>> calculations when managed memory is configured off-heap.
>>>
>>>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1
>>>- 0.48) = 53 GB
>>>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio))
>>>* (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 
>>> 0.48) =
>>>40.6GB
>>>
>>> Have you specified a custom "-Xmx" parameter?
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan 
>>> wrote:
>>>
 Hi,
 Get this error:
 java.io.IOException: Insufficient number of network buffers: required
 2, but only 0 available. The total number of network buffers is currently
 set to 877118 of 32768 bytes each. You can increase this number by setting
 the configuration keys 'taskmanager.network.memory.fraction',
 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
 akka.pattern.AskTimeoutException: Ask timed out on
 [Actor[akka://flink/user/dispatcher#-1420732632]] after [1 ms]. Message
 of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A
 typical reason for `AskTimeoutException` is that the recipient actor didn't
 send a reply.


 Followed docs here:

 

Re: The network memory min (64 mb) and max (1 gb) mismatch

2020-06-12 Thread Xintong Song
I would suggest not to set -Xmx.

Flink will always calculate the JVM heap size from the configuration and
set a proper -Xmx.
If you manually set -Xmx that overwrites the one Flink calculated, it might
result in unpredictable behaviors.


Please refer to this document[1]. In short, you could leverage the
configuration option "taskmanager.memory.task.heap.size", and an additional
constant framework overhead will be added to this value for -Xmx.


Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#jvm-parameters

On Fri, Jun 12, 2020 at 4:50 PM Clay Teeter  wrote:

> Thank you Xintong, while tracking down the existence of
> bash-java-utils.jar I found a bug in my CI scripts that incorrectly built
> the wrong version of flink.  I fixed this and then added a -Xmx value.
>
> env:
>   - name: FLINK_ENV_JAVA_OPTS
> value: "-Xmx{{ .Values.analytics.flink.taskManagerHeapSize }}"
>
>
> It's running perfectly now!
>
> Thank you again,
> Clay
>
>
> On Fri, Jun 12, 2020 at 5:13 AM Xintong Song 
> wrote:
>
>> Hi Clay,
>>
>> Could you verify the "taskmanager.sh" used is the same script shipped
>> with Flink-1.10.1? Or a custom script is used? Also, does the jar file
>> "bash-java-utils.jar" exist in your Flink bin directory?
>>
>> In Flink 1.10, the memory configuration for a TaskManager works as
>> follows.
>>
>>- "taskmanager.sh" executes "bash-java-utils.jar" for the memory
>>calculations
>>- "bash-java-utils.jar" will read your "flink-conf.yaml" and all the
>>"-D" arguments, and calculate memory sizes accordingly
>>- "bash-java-utils.jar" will then return the memory calculation
>>results as two strings, for JVM parameter ("-Xmx", "-Xms", etc.) and
>>dynamic configurations ("-D") respectively
>>- At this step, all the detailed memory sizes should be determined
>>   - That means, even for memory sizes not configured by you, there
>>   should be an exact value generated in the returned dynamic 
>> configuration
>>   - That also means, for memory components configured in ranges
>>   (e.g., network memory configured through a pair of [min, max]),
>>   a deterministic value should be decided and both min/max configuration
>>   options should already been overwrite to that value
>>- "taskmanager.sh" starts the task manager JVM process with the
>>returned JVM parameters, and passes the dynamic configurations as 
>> arguments
>>into the task manager process. These dynamic configurations will be read 
>> by
>>Flink task manager so that memory will be managed accordingly.
>>
>> Flink task manager expects all the memory configurations are already set
>> (thus network min/max should have the same value) before it's started. In
>> your case, it seems such configurations are missing. Same for the cpu cores.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Jun 12, 2020 at 12:58 AM Clay Teeter 
>> wrote:
>>
>>> Hi flink fans,
>>>
>>> I'm hoping for an easy solution.  I'm trying to upgrade my 9.3 cluster
>>> to flink 10.1, but i'm running into memory configuration errors.
>>>
>>> Such as:
>>> *Caused by:
>>> org.apache.flink.configuration.IllegalConfigurationException: The network
>>> memory min (64 mb) and max (1 gb) mismatch, the network memory has to be
>>> resolved and set to a fixed value before task executor starts*
>>>
>>> *Caused by:
>>> org.apache.flink.configuration.IllegalConfigurationException: The required
>>> configuration option Key: 'taskmanager.cpu.cores' , default: null (fallback
>>> keys: []) is not set*
>>>
>>> I was able to fix a cascade of errors by explicitly setting these values:
>>>
>>> taskmanager.memory.managed.size: {{
>>> .Values.analytics.flink.taskManagerManagedSize }}
>>> taskmanager.memory.task.heap.size: {{
>>> .Values.analytics.flink.taskManagerHeapSize }}
>>> taskmanager.memory.jvm-metaspace.size: 500m
>>> taskmanager.cpu.cores: 4
>>>
>>> So, the documentation implies that flink will default many of these
>>> values, however my 101. cluster doesn't seem to be doing this.  9.3, worked
>>> great!
>>>
>>> Do I really have to set all the memory (even network) values?  If not,
>>> what am I missing?
>>>
>>> If i do have to set all the memory parameters, how do I resolve "The
>>> network memory min (64 mb) and max (1 gb) mismatch"?
>>>
>>>
>>> My cluster runs standalone jobs on kube
>>>
>>> flnk-config.yaml:
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>> state.checkpoints.num-retained: 1
>>> taskmanager.memory.managed.size: {{
>>> .Values.analytics.flink.taskManagerManagedSize }}
>>> taskmanager.memory.task.heap.size: {{
>>> .Values.analytics.flink.taskManagerHeapSize }}
>>> taskmanager.memory.jvm-metaspace.size: 500m
>>> taskmanager.cpu.cores: 4
>>> taskmanager.numberOfTaskSlots: {{
>>> .Values.analytics.task.numberOfTaskSlots }}
>>> parallelism.default: {{ 

Re: Flink Async IO operator tuning / micro-benchmarks

2020-06-12 Thread Arti Pande
Hi Arvid,

Thanks for quick reply and totally agree with you on the differences
between microbenchmarks and a full benchmark with specific use-case. Thanks
for sending the microbenchmark screenshot.

For our use-case, the streaming pipeline has five main transformations that
have business logic, of which Async IO to external API endpoint is one
operator. To create benchmarks for operators, I run the real pipeline with
full load on a single machine and note the Throughput and latency. Then add
each operator one by one; always keeping the other basic operators like
source, watermark generator, deserializer, sink etc turned on. The
intention is to build a sort of incremental realistic benchmark for each
operator for given use-case. Adding the AsyncIO operator (with parallelism
7 and async buffer capacity 100) with AsyncHTTPClient library brings
throughput down from 102 K to a very low number i.e. 10K or 12 K
records/sec.

As you suggested, we tested the library being used (AsyncHTTPClient)
independently of Flink, in a similar way to what AsyncIO does. A
simple java program that invokes millions of API calls in a loop, with
hard-coded POST request values, and limited (configurable) number of
concurrent (maxInFlight) requests.  AsyncHTTPClient library by default uses
nCores * 2 (= 16) IO threads, plus a fixed set of threads (say 2 or 4) for
the ExecutorService to be passed to the Listener of result Future. So with
this library the code requires at least 18 or 20 threads. Varying the
maxInFlightRequests from 100 to 3000 the throughput varied from 17 K to 34
K records/sec. Ofcourse this was with hard-coded POST request values and
discarding the response body on reading (no further processing on it).

When we tried to vary the async buffer capacity of AsyncIO (equivalent of
maxInFlightRequests above) beyond 100, our throughput dropped further by
20% to 30%. Whereas in the test program above we would get better
performance as we increased maxInFlightRequests from 100 to 3000.

To reduce backpressure on upstream operators we had to increase the AsyncIO
operator parallelism upto 7. But that means at least 20*7 = 140 threads per
single pipeline plus the threads of other operators in the pipeline.

The question therefore is, given the pipeline is highly multithreaded can
8-core machines suit this? Also is Flink the right framework for such
multi-threaded streaming pipelines that have external API calls and
high-speed ingestion?

Thanks & regards
Arti


On Thu, Jun 11, 2020 at 1:13 PM Arvid Heise  wrote:

> Hi Arti,
>
> you are now comparing a microbenchmark to a full benchmark. The job
> contains multiple transformations (each dividing the power of the core) and
> most importantly, it contains network traffic (shuffle) to perform a join.
> Unless you do heavy transformation, your throughput will always be
> bottlenecked with network traffic. At the very least, that contains
> additional serialization costs. And it contains heavy states through the
> window aggregation, which also results in serialization and I/O.
>
> I attached the screenshot from the microbenchmark. In general,
> microbenchmarks are not really useful for end-users, but you explicitly
> asked for them. The bottleneck usually arises from the IO part, which is
> NOT benchmarked. I'm assuming you are actually more interested in whether
> and how your use case can be solved in Flink rather than technical details.
> First of all, it always helps to have more information about the intended
> query than going directly into technical details. I gathered that you have
> a third-part microservice that you need to query and you want to do
> additional transformations. It would also be interesting how you performed
> your benchmarks and measured the performance drop.
>
> First of all, even though I discouraged the use of microservices in a
> stream processor, it doesn't mean that it's not possible. You just lose
> some of the nice properties that are possible. 1) Your latency will
> typically be much higher as if it was a data source. 2) You lose
> exactly-once behavior in regard to the HTTP endpoint. On recovery, requests
> since the last checkpoint will be repeated. You need to check if that makes
> sense if the microservices has side-effects that don't allow that. But the
> same is true for most other stream processors and can only be avoided by
> using per-record commits (and then even then, this in-progress record may
> result in duplicate HTTP queries on recovery). 3) If that external endpoint
> is down, there is no way to do meaningful processing. So you add the
> downtime of your Flink cluster and your external microservice. That becomes
> especially important if you have a wide-range of microservices.
>
> So as I wrote in last mail, I'd first establish a boundary independent of
> Flink, by running some Java program with your used async library and tune
> the settings to reach saturation on one machine. That boundary becomes your
> gold standard - there 

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-12 Thread Xintong Song
Flink should have calculated the heap size and set the -Xms, according to
the equations I mentioned. So if you haven't set an customized -Xmx that
overwrites this, it should not use the default 1.4 of physical memory.
>
>
>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 -
>0.48) = 53 GB
>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) *
>(1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) =
>40.6GB
>
>
Are you running Flink on Mesos? I think Flink has not automatically set
-Xmx on Mesos.


BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is
much closer to 29GB if we consider there are some rounding errors and
accuracy loss.


Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan 
wrote:

> Thx, Xintong for a great answer. Much appreciated.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap
>
>
> Max heap: if -Xmx is set then it is its value else ¼ of physical machine
> memory estimated by the JVM
>
> No -Xmx is set.So, 1/4 of 102GB  = 25.5GB but not sure about the 29GB
> figure.
>
> On Thu, Jun 11, 2020 at 9:14 PM Xintong Song 
> wrote:
>
>> Hi Vijay,
>>
>> The memory configurations in Flink 1.9 and previous versions are indeed
>> complicated and confusing. That is why we made significant changes to it in
>> Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the
>> upcoming Flink 1.11 which is very likely to be released in this month.
>>
>> Regarding your questions,
>>
>>- "Physical Memory" displayed on the web ui stands for the total
>>memory on your machine. This information is retrieved from your OS. It is
>>not related to the network memory calculation. It is displayed mainly for
>>historical reasons.
>>- The error message means that you have about 26.8 GB network memory
>>(877118 * 32768 bytes), and your job is trying to use more.
>>- The "total memory" referred in network memory calculation is:
>>   - jvm-heap + network, if managed memory is configured on-heap
>>   (default)
>>  - According to your screenshot, the managed memory
>>  on-heap/off-heap configuration is not touched, so this should be 
>> your case.
>>   - jvm-heap + managed + network, if managed memory is configured
>>   off-heap
>>- The network memory size is actually derived reversely. Flink reads
>>the max heap size from JVM (and the managed memory size from configuration
>>if it is configured off-heap), and derives the network memory size with 
>> the
>>following equation.
>>   - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap /
>>   (1-networkFraction) * networkFraction))
>>   - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48)
>>   * 0.48)) = 26.8GB
>>
>> One thing I don't understand is, why do you only have 29GB heap size when
>> "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The
>> JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total"
>> to represent "taskmanager.heap.size" for short. Also omitted the
>> calculations when managed memory is configured off-heap.
>>
>>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 -
>>0.48) = 53 GB
>>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio))
>>* (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) 
>> =
>>40.6GB
>>
>> Have you specified a custom "-Xmx" parameter?
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>> Get this error:
>>> java.io.IOException: Insufficient number of network buffers: required 2,
>>> but only 0 available. The total number of network buffers is currently set
>>> to 877118 of 32768 bytes each. You can increase this number by setting the
>>> configuration keys 'taskmanager.network.memory.fraction',
>>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>>> akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/dispatcher#-1420732632]] after [1 ms]. Message
>>> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A
>>> typical reason for `AskTimeoutException` is that the recipient actor didn't
>>> send a reply.
>>>
>>>
>>> Followed docs here:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>>
>>> network = Min(max, Max(min, fraction x total)  //what does Total mean -
>>> The max JVM heap is used to derive the total memory for the calculation of
>>> network buffers. - can I see it in the Flink Dashboard ??? 117GB here ?
>>> = Min(50G, Max(500mb, Max(0.48 * 117G))  ) = MIn(50G, 56.16G)= 50G
>>> 877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ?
>>> Used this in flink-conf.yaml:
>>> taskmanager.numberOfTaskSlots: 10
>>> rest.server.max-content-length: 

Re: The network memory min (64 mb) and max (1 gb) mismatch

2020-06-12 Thread Clay Teeter
Thank you Xintong, while tracking down the existence of bash-java-utils.jar
I found a bug in my CI scripts that incorrectly built the wrong version of
flink.  I fixed this and then added a -Xmx value.

env:
  - name: FLINK_ENV_JAVA_OPTS
value: "-Xmx{{ .Values.analytics.flink.taskManagerHeapSize }}"


It's running perfectly now!

Thank you again,
Clay


On Fri, Jun 12, 2020 at 5:13 AM Xintong Song  wrote:

> Hi Clay,
>
> Could you verify the "taskmanager.sh" used is the same script shipped with
> Flink-1.10.1? Or a custom script is used? Also, does the jar file
> "bash-java-utils.jar" exist in your Flink bin directory?
>
> In Flink 1.10, the memory configuration for a TaskManager works as follows.
>
>- "taskmanager.sh" executes "bash-java-utils.jar" for the memory
>calculations
>- "bash-java-utils.jar" will read your "flink-conf.yaml" and all the
>"-D" arguments, and calculate memory sizes accordingly
>- "bash-java-utils.jar" will then return the memory calculation
>results as two strings, for JVM parameter ("-Xmx", "-Xms", etc.) and
>dynamic configurations ("-D") respectively
>- At this step, all the detailed memory sizes should be determined
>   - That means, even for memory sizes not configured by you, there
>   should be an exact value generated in the returned dynamic configuration
>   - That also means, for memory components configured in ranges
>   (e.g., network memory configured through a pair of [min, max]),
>   a deterministic value should be decided and both min/max configuration
>   options should already been overwrite to that value
>- "taskmanager.sh" starts the task manager JVM process with the
>returned JVM parameters, and passes the dynamic configurations as arguments
>into the task manager process. These dynamic configurations will be read by
>Flink task manager so that memory will be managed accordingly.
>
> Flink task manager expects all the memory configurations are already set
> (thus network min/max should have the same value) before it's started. In
> your case, it seems such configurations are missing. Same for the cpu cores.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 12, 2020 at 12:58 AM Clay Teeter 
> wrote:
>
>> Hi flink fans,
>>
>> I'm hoping for an easy solution.  I'm trying to upgrade my 9.3 cluster to
>> flink 10.1, but i'm running into memory configuration errors.
>>
>> Such as:
>> *Caused by: org.apache.flink.configuration.IllegalConfigurationException:
>> The network memory min (64 mb) and max (1 gb) mismatch, the network memory
>> has to be resolved and set to a fixed value before task executor starts*
>>
>> *Caused by: org.apache.flink.configuration.IllegalConfigurationException:
>> The required configuration option Key: 'taskmanager.cpu.cores' , default:
>> null (fallback keys: []) is not set*
>>
>> I was able to fix a cascade of errors by explicitly setting these values:
>>
>> taskmanager.memory.managed.size: {{
>> .Values.analytics.flink.taskManagerManagedSize }}
>> taskmanager.memory.task.heap.size: {{
>> .Values.analytics.flink.taskManagerHeapSize }}
>> taskmanager.memory.jvm-metaspace.size: 500m
>> taskmanager.cpu.cores: 4
>>
>> So, the documentation implies that flink will default many of these
>> values, however my 101. cluster doesn't seem to be doing this.  9.3, worked
>> great!
>>
>> Do I really have to set all the memory (even network) values?  If not,
>> what am I missing?
>>
>> If i do have to set all the memory parameters, how do I resolve "The
>> network memory min (64 mb) and max (1 gb) mismatch"?
>>
>>
>> My cluster runs standalone jobs on kube
>>
>> flnk-config.yaml:
>> state.backend: rocksdb
>> state.backend.incremental: true
>> state.checkpoints.num-retained: 1
>> taskmanager.memory.managed.size: {{
>> .Values.analytics.flink.taskManagerManagedSize }}
>> taskmanager.memory.task.heap.size: {{
>> .Values.analytics.flink.taskManagerHeapSize }}
>> taskmanager.memory.jvm-metaspace.size: 500m
>> taskmanager.cpu.cores: 4
>> taskmanager.numberOfTaskSlots: {{
>> .Values.analytics.task.numberOfTaskSlots }}
>> parallelism.default: {{ .Values.analytics.flink.parallelism }}
>>
>>
>> JobManger:
>> command: ["/opt/flink/bin/standalone-job.sh"]
>> args: ["start-foreground", "-j={{ .Values.analytics.flinkRunnable
>> }}",  ...
>>
>> TakManager
>> command: ["/opt/flink/bin/taskmanager.sh"]
>> args: [
>>   "start-foreground",
>>   "-Djobmanager.rpc.address=localhost",
>>   "-Dmetrics.reporter.prom.port=9430"]
>>
>>
>>
>>


Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-12 Thread Vijay Balakrishnan
Thx, Xintong for a great answer. Much appreciated.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap


Max heap: if -Xmx is set then it is its value else ¼ of physical machine
memory estimated by the JVM

No -Xmx is set.So, 1/4 of 102GB  = 25.5GB but not sure about the 29GB
figure.

On Thu, Jun 11, 2020 at 9:14 PM Xintong Song  wrote:

> Hi Vijay,
>
> The memory configurations in Flink 1.9 and previous versions are indeed
> complicated and confusing. That is why we made significant changes to it in
> Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the
> upcoming Flink 1.11 which is very likely to be released in this month.
>
> Regarding your questions,
>
>- "Physical Memory" displayed on the web ui stands for the total
>memory on your machine. This information is retrieved from your OS. It is
>not related to the network memory calculation. It is displayed mainly for
>historical reasons.
>- The error message means that you have about 26.8 GB network memory
>(877118 * 32768 bytes), and your job is trying to use more.
>- The "total memory" referred in network memory calculation is:
>   - jvm-heap + network, if managed memory is configured on-heap
>   (default)
>  - According to your screenshot, the managed memory
>  on-heap/off-heap configuration is not touched, so this should be 
> your case.
>   - jvm-heap + managed + network, if managed memory is configured
>   off-heap
>- The network memory size is actually derived reversely. Flink reads
>the max heap size from JVM (and the managed memory size from configuration
>if it is configured off-heap), and derives the network memory size with the
>following equation.
>   - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap /
>   (1-networkFraction) * networkFraction))
>   - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) *
>   0.48)) = 26.8GB
>
> One thing I don't understand is, why do you only have 29GB heap size when
> "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The
> JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total"
> to represent "taskmanager.heap.size" for short. Also omitted the
> calculations when managed memory is configured off-heap.
>
>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 -
>0.48) = 53 GB
>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) *
>(1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) =
>40.6GB
>
> Have you specified a custom "-Xmx" parameter?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> Get this error:
>> java.io.IOException: Insufficient number of network buffers: required 2,
>> but only 0 available. The total number of network buffers is currently set
>> to 877118 of 32768 bytes each. You can increase this number by setting the
>> configuration keys 'taskmanager.network.memory.fraction',
>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>> akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#-1420732632]] after [1 ms]. Message
>> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A
>> typical reason for `AskTimeoutException` is that the recipient actor didn't
>> send a reply.
>>
>>
>> Followed docs here:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>
>> network = Min(max, Max(min, fraction x total)  //what does Total mean -
>> The max JVM heap is used to derive the total memory for the calculation of
>> network buffers. - can I see it in the Flink Dashboard ??? 117GB here ?
>> = Min(50G, Max(500mb, Max(0.48 * 117G))  ) = MIn(50G, 56.16G)= 50G
>> 877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ?
>> Used this in flink-conf.yaml:
>> taskmanager.numberOfTaskSlots: 10
>> rest.server.max-content-length: 314572800
>> taskmanager.network.memory.fraction: 0.45
>> taskmanager.network.memory.max: 50gb
>> taskmanager.network.memory.min: 500mb
>> akka.ask.timeout: 240s
>> cluster.evenly-spread-out-slots: true
>> akka.tcp.timeout: 240s
>> taskmanager.network.request-backoff.initial: 5000
>> taskmanager.network.request-backoff.max: 3
>> web.timeout:100
>> web.refresh-interval:6000
>>
>> Saw some old calc about buffers
>> (slots/Tm * slots/TM) * #TMs * 4
>> =10 * 10 * 47 * 4 = 18,800 buffers.
>>
>> What am I missing in the network buffer calc ??
>>
>> TIA,
>>
>>
>>


Re: Flink 1.10.1 not using FLINK_TM_HEAP for TaskManager JVM Heap size correctly?

2020-06-12 Thread Xintong Song
To be more specific, your 1400m total memory should also consists of:

   - 572MB heap memory (-Xmx & -Xms)
   - 268MB direct memory (-XX:MaxDirectMemorySize)
   - 560MB managed memory (native memory, calculated as 1400m *
   managedMemoryFraction, the fraction is by default 0.4)


Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 4:27 PM Xintong Song  wrote:

> Hi Li,
>
> FLINK_TM_HEAP corresponds to the legacy configuration option
> "taskmanager.heap.size". It is supported for backwards compatibility. I
> strongly recommend you to use "taskmanager.memory.flink.size" or
> "taskmanager.memory.process.size" instead, which can be passed either in
> "flink-conf.yaml" or through "-D" options.
>
> This configuration option, while confusingly named with "heap", is
> actually specifying the total memory of task manager, including the
> off-heap memory components.
>
> This is also documented as follows[1] (for the configuration option but
> not for the environment variable)
>
>> The previous options which were responsible for the total memory used by
>> Flink are taskmanager.heap.size or taskmanager.heap.mb. Despite their
>> naming, they included not only JVM heap but also other off-heap memory
>> components. The options have been deprecated.
>>
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html#total-memory-previously-heap-memory
>
> On Fri, Jun 12, 2020 at 4:09 PM Li Peng  wrote:
>
>> Hey folks, we recently migrated from Flink 1.9.x to 1.10.1, and we
>> noticed some wonky behavior in how JVM is configured:
>>
>> 1. We Add FLINK_JM_HEAP=5000m and FLINK_TM_HEAP=1400m variables to the
>> environment
>> 2. The JobManager allocates the right heap size as expected
>> 3. However, the TaskManager (started via taskmanager.sh), logs this
>> instead:
>>
>>  - 'taskmanager.memory.flink.size' is not specified, use the *configured
>>> deprecated task manager heap value (1.367gb (1468006400 bytes)) for it.*
>>>  - The derived from fraction jvm overhead memory (184.000mb (192937987
>>> bytes)) is less than its min value 192.000mb (201326592 bytes), min value
>>> will be used instead
>>> BASH_JAVA_UTILS_EXEC_RESULT:*-Xmx599785462 -Xms599785462*
>>> -XX:MaxDirectMemorySize=281018370 -XX:MaxMetaspaceSize=268435456
>>
>>
>> So the logs say it will use the configured 1400m as expected, but for
>> some reason it picks 599785462 as the heap size instead (TaskManagerRunner
>> logs that Maximum heap size is 572 MiBytes, so it's verified that the 1400m
>> value is not used)?
>>
>> Anyone know if I'm missing a setting here or something?
>>
>> Thanks,
>> Li
>>
>>
>>


Re: Re: Re: flink sql Temporal table join failed

2020-06-12 Thread Benchao Li
看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。

Zhou Zach  于2020年6月12日周五 下午3:47写道:

> 还是不行,
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> ERROR StatusLogger No log4j2 configuration file found. Using default
> configuration: logging only errors to the console.
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "time FROM" at line 1, column 44.
> Was expecting one of:
> "CURSOR" ...
> "EXISTS" ...
> "NOT" ...
> "ROW" ...
> "(" ...
> "+" ...
> "-" ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "TRUE" ...
> "FALSE" ...
> "UNKNOWN" ...
> "NULL" ...
>  ...
>  ...
>  ...
> "DATE" ...
> "TIME"  ...
> "TIMESTAMP" ...
> "INTERVAL" ...
> "?" ...
> "CAST" ...
> "EXTRACT" ...
> "POSITION" ...
> "CONVERT" ...
> "TRANSLATE" ...
> "OVERLAY" ...
> "FLOOR" ...
> "CEIL" ...
> "CEILING" ...
> "SUBSTRING" ...
> "TRIM" ...
> "CLASSIFIER" ...
> "MATCH_NUMBER" ...
> "RUNNING" ...
> "PREV" ...
> "NEXT" ...
> "JSON_EXISTS" ...
> "JSON_VALUE" ...
> "JSON_QUERY" ...
> "JSON_OBJECT" ...
> "JSON_OBJECTAGG" ...
> "JSON_ARRAY" ...
> "JSON_ARRAYAGG" ...
>  ...
> "MULTISET" ...
> "ARRAY" ...
> "MAP" ...
> "PERIOD" ...
> "SPECIFIC" ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "ABS" ...
> "AVG" ...
> "CARDINALITY" ...
> "CHAR_LENGTH" ...
> "CHARACTER_LENGTH" ...
> "COALESCE" ...
> "COLLECT" ...
> "COVAR_POP" ...
> "COVAR_SAMP" ...
> "CUME_DIST" ...
> "COUNT" ...
> "CURRENT_DATE" ...
> "CURRENT_TIME" ...
> "CURRENT_TIMESTAMP" ...
> "DENSE_RANK" ...
> "ELEMENT" ...
> "EXP" ...
> "FIRST_VALUE" ...
> "FUSION" ...
> "GROUPING" ...
> "HOUR" ...
> "LAG" ...
> "LEAD" ...
> "LEFT" ...
> "LAST_VALUE" ...
> "LN" ...
> "LOCALTIME" ...
> "LOCALTIMESTAMP" ...
> "LOWER" ...
> "MAX" ...
> "MIN" ...
> "MINUTE" ...
> "MOD" ...
> "MONTH" ...
> "NTH_VALUE" ...
> "NTILE" ...
> "NULLIF" ...
> "OCTET_LENGTH" ...
> "PERCENT_RANK" ...
> "POWER" ...
> "RANK" ...
> "REGR_COUNT" ...
> "REGR_SXX" ...
> "REGR_SYY" ...
> "RIGHT" ...
> "ROW_NUMBER" ...
> "SECOND" ...
> "SQRT" ...
> "STDDEV_POP" ...
> "STDDEV_SAMP" ...
> "SUM" ...
> "UPPER" ...
> "TRUNCATE" ...
> "USER" ...
> "VAR_POP" ...
> "VAR_SAMP" ...
> "YEAR" ...
> "CURRENT_CATALOG" ...
> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> "CURRENT_PATH" ...
> "CURRENT_ROLE" ...
> "CURRENT_SCHEMA" ...
> "CURRENT_USER" ...
> "SESSION_USER" ...
> "SYSTEM_USER" ...
> "NEW" ...
> "CASE" ...
> "CURRENT" ...
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
> at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
> 

Re: Flink 1.10.1 not using FLINK_TM_HEAP for TaskManager JVM Heap size correctly?

2020-06-12 Thread Xintong Song
Hi Li,

FLINK_TM_HEAP corresponds to the legacy configuration option
"taskmanager.heap.size". It is supported for backwards compatibility. I
strongly recommend you to use "taskmanager.memory.flink.size" or
"taskmanager.memory.process.size" instead, which can be passed either in
"flink-conf.yaml" or through "-D" options.

This configuration option, while confusingly named with "heap", is actually
specifying the total memory of task manager, including the off-heap memory
components.

This is also documented as follows[1] (for the configuration option but not
for the environment variable)

> The previous options which were responsible for the total memory used by
> Flink are taskmanager.heap.size or taskmanager.heap.mb. Despite their
> naming, they included not only JVM heap but also other off-heap memory
> components. The options have been deprecated.
>

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html#total-memory-previously-heap-memory

On Fri, Jun 12, 2020 at 4:09 PM Li Peng  wrote:

> Hey folks, we recently migrated from Flink 1.9.x to 1.10.1, and we noticed
> some wonky behavior in how JVM is configured:
>
> 1. We Add FLINK_JM_HEAP=5000m and FLINK_TM_HEAP=1400m variables to the
> environment
> 2. The JobManager allocates the right heap size as expected
> 3. However, the TaskManager (started via taskmanager.sh), logs this
> instead:
>
>  - 'taskmanager.memory.flink.size' is not specified, use the *configured
>> deprecated task manager heap value (1.367gb (1468006400 bytes)) for it.*
>>  - The derived from fraction jvm overhead memory (184.000mb (192937987
>> bytes)) is less than its min value 192.000mb (201326592 bytes), min value
>> will be used instead
>> BASH_JAVA_UTILS_EXEC_RESULT:*-Xmx599785462 -Xms599785462*
>> -XX:MaxDirectMemorySize=281018370 -XX:MaxMetaspaceSize=268435456
>
>
> So the logs say it will use the configured 1400m as expected, but for some
> reason it picks 599785462 as the heap size instead (TaskManagerRunner logs
> that Maximum heap size is 572 MiBytes, so it's verified that the 1400m
> value is not used)?
>
> Anyone know if I'm missing a setting here or something?
>
> Thanks,
> Li
>
>
>


Flink 1.10.1 not using FLINK_TM_HEAP for TaskManager JVM Heap size correctly?

2020-06-12 Thread Li Peng
Hey folks, we recently migrated from Flink 1.9.x to 1.10.1, and we noticed
some wonky behavior in how JVM is configured:

1. We Add FLINK_JM_HEAP=5000m and FLINK_TM_HEAP=1400m variables to the
environment
2. The JobManager allocates the right heap size as expected
3. However, the TaskManager (started via taskmanager.sh), logs this instead:

 - 'taskmanager.memory.flink.size' is not specified, use the *configured
> deprecated task manager heap value (1.367gb (1468006400 bytes)) for it.*
>  - The derived from fraction jvm overhead memory (184.000mb (192937987
> bytes)) is less than its min value 192.000mb (201326592 bytes), min value
> will be used instead
> BASH_JAVA_UTILS_EXEC_RESULT:*-Xmx599785462 -Xms599785462*
> -XX:MaxDirectMemorySize=281018370 -XX:MaxMetaspaceSize=268435456


So the logs say it will use the configured 1400m as expected, but for some
reason it picks 599785462 as the heap size instead (TaskManagerRunner logs
that Maximum heap size is 572 MiBytes, so it's verified that the 1400m
value is not used)?

Anyone know if I'm missing a setting here or something?

Thanks,
Li


Re:Re: Re: flink sql Temporal table join failed

2020-06-12 Thread Zhou Zach
还是不行,
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "time FROM" at line 1, column 44.
Was expecting one of:
"CURSOR" ...
"EXISTS" ...
"NOT" ...
"ROW" ...
"(" ...
"+" ...
"-" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"TRUE" ...
"FALSE" ...
"UNKNOWN" ...
"NULL" ...
 ...
 ...
 ...
"DATE" ...
"TIME"  ...
"TIMESTAMP" ...
"INTERVAL" ...
"?" ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"NEXT" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUERY" ...
"JSON_OBJECT" ...
"JSON_OBJECTAGG" ...
"JSON_ARRAY" ...
"JSON_ARRAYAGG" ...
 ...
"MULTISET" ...
"ARRAY" ...
"MAP" ...
"PERIOD" ...
"SPECIFIC" ...
 ...
 ...
 ...
 ...
 ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
"CURRENT_DATE" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"DENSE_RANK" ...
"ELEMENT" ...
"EXP" ...
"FIRST_VALUE" ...
"FUSION" ...
"GROUPING" ...
"HOUR" ...
"LAG" ...
"LEAD" ...
"LEFT" ...
"LAST_VALUE" ...
"LN" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"LOWER" ...
"MAX" ...
"MIN" ...
"MINUTE" ...
"MOD" ...
"MONTH" ...
"NTH_VALUE" ...
"NTILE" ...
"NULLIF" ...
"OCTET_LENGTH" ...
"PERCENT_RANK" ...
"POWER" ...
"RANK" ...
"REGR_COUNT" ...
"REGR_SXX" ...
"REGR_SYY" ...
"RIGHT" ...
"ROW_NUMBER" ...
"SECOND" ...
"SQRT" ...
"STDDEV_POP" ...
"STDDEV_SAMP" ...
"SUM" ...
"UPPER" ...
"TRUNCATE" ...
"USER" ...
"VAR_POP" ...
"VAR_SAMP" ...
"YEAR" ...
"CURRENT_CATALOG" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_USER" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"NEW" ...
"CASE" ...
"CURRENT" ...

at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
at 
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 

Re:Re:Re: flink sql Temporal table join failed

2020-06-12 Thread 陈邵瑾
参考一下sql相关time的文档,根据描述使用姿势有问题:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
At 2020-06-12 14:24:07, "Zhou Zach"  wrote:
>flink 1.10.0:
>在create table中,加PROCTIME() AS proctime字段报错
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-12 14:08:11,"Benchao Li"  写道:
>>Hi,
>>
>>Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>>可以参考下[1]
>>
>>[1]
>>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>>
>>Zhou Zach  于2020年6月12日周五 下午1:33写道:
>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>>
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>>
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Cannot generate a valid execution plan for the given query:
>>>
>>>
>>>
>>>
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>> fields=[time, sum_age])
>>>
>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>>
>>>+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>>
>>>   :- FlinkLogicalCalc(select=[uid, time])
>>>
>>>   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>>
>>>   +- FlinkLogicalSnapshot(period=[$cor0.time])
>>>
>>>  +- FlinkLogicalCalc(select=[uid, age])
>>>
>>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>>
>>>
>>>
>>>
>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>>> table's proctime field, doesn't support 'PROCTIME()'
>>>
>>> Please check the documentation for the set of currently supported SQL
>>> features.
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>>
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>>
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>
>>> at
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>>
>>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>>
>>> at
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>>
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>>
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>>
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>>
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>>
>>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>>> field, doesn't support 'PROCTIME()'
>>>
>>> at
>>> 

Re: Re: flink sql Temporal table join failed

2020-06-12 Thread Benchao Li
你写反了,是proctime AS PROCTIME()。
计算列跟普通query里面的AS是反着的。

Zhou Zach  于2020年6月12日周五 下午2:24写道:

> flink 1.10.0:
> 在create table中,加PROCTIME() AS proctime字段报错
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-12 14:08:11,"Benchao Li"  写道:
> >Hi,
> >
> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
> >可以参考下[1]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
> >
> >Zhou Zach  于2020年6月12日周五 下午1:33写道:
> >
> >> SLF4J: Class path contains multiple SLF4J bindings.
> >>
> >> SLF4J: Found binding in
> >>
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>
> >> SLF4J: Found binding in
> >>
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>
> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >> explanation.
> >>
> >> SLF4J: Actual binding is of type
> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
> >>
> >> ERROR StatusLogger No log4j2 configuration file found. Using default
> >> configuration: logging only errors to the console.
> >>
> >> Exception in thread "main" org.apache.flink.table.api.TableException:
> >> Cannot generate a valid execution plan for the given query:
> >>
> >>
> >>
> >>
> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> >> fields=[time, sum_age])
> >>
> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
> >>
> >>+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
> >>
> >>   :- FlinkLogicalCalc(select=[uid, time])
> >>
> >>   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> default_database, user_behavior, source: [KafkaTableSource(uid,
> phoneType,
> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
> >>
> >>   +- FlinkLogicalSnapshot(period=[$cor0.time])
> >>
> >>  +- FlinkLogicalCalc(select=[uid, age])
> >>
> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
> >> age, created_time)]]], fields=[uid, sex, age, created_time])
> >>
> >>
> >>
> >>
> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
> >> table's proctime field, doesn't support 'PROCTIME()'
> >>
> >> Please check the documentation for the set of currently supported SQL
> >> features.
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >>
> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >>
> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >>
> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >>
> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>
> >> at
> >>
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> >>
> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
> >>
> >> at
> >>
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> >>
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> >>
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> >>
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >>
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >>
> >> at
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
> >>
> >> at
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
> >>
> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
> join
> >> currently only supports 'FOR 

Re: flink sql Temporal table join failed

2020-06-12 Thread 李奇
需要使用Proctime才可以关联,参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
 

> 在 2020年6月12日,下午2:24,Zhou Zach  写道:
> 
> flink 1.10.0:
> 在create table中,加PROCTIME() AS proctime字段报错
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> 在 2020-06-12 14:08:11,"Benchao Li"  写道:
>> Hi,
>> 
>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>> 可以参考下[1]
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>> 
>> Zhou Zach  于2020年6月12日周五 下午1:33写道:
>> 
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> 
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> 
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> 
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> 
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>> 
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Cannot generate a valid execution plan for the given query:
>>> 
>>> 
>>> 
>>> 
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>> fields=[time, sum_age])
>>> 
>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>> 
>>>   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>> 
>>>  :- FlinkLogicalCalc(select=[uid, time])
>>> 
>>>  :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>> 
>>>  +- FlinkLogicalSnapshot(period=[$cor0.time])
>>> 
>>> +- FlinkLogicalCalc(select=[uid, age])
>>> 
>>>+- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>> 
>>> 
>>> 
>>> 
>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>>> table's proctime field, doesn't support 'PROCTIME()'
>>> 
>>> Please check the documentation for the set of currently supported SQL
>>> features.
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>> 
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>> 
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>> 
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> 
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> 
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> 
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> 
>>> at
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>> 
>>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>> 
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>> 
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>> 
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>> 
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>> 
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>> 
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>> 
>>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime

Re:Re: flink sql Temporal table join failed

2020-06-12 Thread Zhou Zach
flink 1.10.0:
在create table中,加PROCTIME() AS proctime字段报错

















在 2020-06-12 14:08:11,"Benchao Li"  写道:
>Hi,
>
>Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>可以参考下[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>
>Zhou Zach  于2020年6月12日周五 下午1:33写道:
>
>> SLF4J: Class path contains multiple SLF4J bindings.
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>>
>> SLF4J: Actual binding is of type
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>
>> ERROR StatusLogger No log4j2 configuration file found. Using default
>> configuration: logging only errors to the console.
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Cannot generate a valid execution plan for the given query:
>>
>>
>>
>>
>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>> fields=[time, sum_age])
>>
>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>
>>+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>
>>   :- FlinkLogicalCalc(select=[uid, time])
>>
>>   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>
>>   +- FlinkLogicalSnapshot(period=[$cor0.time])
>>
>>  +- FlinkLogicalCalc(select=[uid, age])
>>
>> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>
>>
>>
>>
>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>> table's proctime field, doesn't support 'PROCTIME()'
>>
>> Please check the documentation for the set of currently supported SQL
>> features.
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>> at
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>
>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>
>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>> field, doesn't support 'PROCTIME()'
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>
>> at
>> 

Re: flink sql Temporal table join failed

2020-06-12 Thread Benchao Li
Hi,

Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
可以参考下[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html

Zhou Zach  于2020年6月12日周五 下午1:33写道:

> SLF4J: Class path contains multiple SLF4J bindings.
>
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
>
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
>
> ERROR StatusLogger No log4j2 configuration file found. Using default
> configuration: logging only errors to the console.
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:
>
>
>
>
> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> fields=[time, sum_age])
>
> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>
>+- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>
>   :- FlinkLogicalCalc(select=[uid, time])
>
>   :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>
>   +- FlinkLogicalSnapshot(period=[$cor0.time])
>
>  +- FlinkLogicalCalc(select=[uid, age])
>
> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
> age, created_time)]]], fields=[uid, sex, age, created_time])
>
>
>
>
> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
> table's proctime field, doesn't support 'PROCTIME()'
>
> Please check the documentation for the set of currently supported SQL
> features.
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>
> Caused by: org.apache.flink.table.api.TableException: Temporal table join
> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
> field, doesn't support 'PROCTIME()'
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>
> at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>
> at
>