Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread
Hi Matthias,
We have “solved” the problem by tuning the join. But I still try to answer the 
questions, hoping this will help.


* What is the option you're referring to for the bounded shuffle? That might 
help to understand what streaming mode solution you're looking for.

|
taskmanager.network.blocking-shuffle.type
| "file" | String | The blocking shuffle type, either "mmap" or "file". The 
"auto" means selecting the property type automatically based on system memory 
architecture (64 bit for mmap and 32 bit for file). Note that the memory usage 
of mmap is not accounted by configured memory limits, but some resource 
frameworks like yarn would track this memory usage and kill the container once 
memory exceeding some threshold. Also note that this option is experimental and 
might be changed future. |
* What does the job graph look like? Are you assuming that it's due to a 
shuffling operation? Could you provide the logs to get a better understanding 
of your case?
   The graph is join of three streams. And we use rocksdb as the statebackend. 
I think the crash is due to rocksdb. And I could not get the logs (because some 
misconfiguration, which caused the logs are empty). 
* Do you observe the same memory increase for other TaskManager nodes?

   After one tm is killed, the job failed. So I didn’t see the exactly same 
memory increase for other tms. But I think other tms would have similiar 
behavior because the data sizes they processed are almost the same.
* Are you expecting to reach the memory limits considering that you mentioned a 
"big state size"? Would increasing the memory limit be an option or do you fear 
that it's caused by some memory leak?
  By change the tm process memory to 18GB instead of 12GB, it didn’t help.


By the answers I provided, I think maybe we should figure out why rocksdb 
overused virtual memory, and caused yarn to kill the container.


On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:

The Flink version we used is 1.12.0.


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


On 04/16/2021 16:07,马阳阳 wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager 
process was killed by the yarn node manager. The following log is from the yarn 
node manager:


2021-04-16 11:51:23,013 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container 
[pid=521232,containerID=container_e157_1618223445363_16943_01_10] is 
running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 
12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing 
container.


When searching solution for this problem, I found that there is a option for 
this that worked for bounded shuffle. So is there a way to get rid of this in 
streaming mode?


PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7



Re:Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-16 Thread
The Flink version we used is 1.12.0.


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


On 04/16/2021 16:07,马阳阳 wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager 
process was killed by the yarn node manager. The following log is from the yarn 
node manager:


2021-04-16 11:51:23,013 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container 
[pid=521232,containerID=container_e157_1618223445363_16943_01_10] is 
running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 
12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing 
container.


When searching solution for this problem, I found that there is a option for 
this that worked for bounded shuffle. So is there a way to get rid of this in 
streaming mode?


PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7



Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-16 Thread
Hi, community,
When running a Flink streaming job with big state size, one task manager 
process was killed by the yarn node manager. The following log is from the yarn 
node manager:


2021-04-16 11:51:23,013 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container 
[pid=521232,containerID=container_e157_1618223445363_16943_01_10] is 
running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 
12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing 
container.


When searching solution for this problem, I found that there is a option for 
this that worked for bounded shuffle. So is there a way to get rid of this in 
streaming mode?


PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7



回复: Flink 1.12 On Yarn 作业提交失败问题

2021-02-24 Thread
说明一下,yarn.ship-files这个配置的文件夹下需要包含flink-yarn的jar包,可以配置成flink home下的lib文件夹


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


在2021年02月25日 08:59,马阳阳 写道:
可以试试在flink-conf.yaml里添加如下配置:
yarn.flink-dist-jar: /opt/flink-1.12/lib/flink-dist_2.11-1.12.0.jar
yarn.ship-files: /data/dfl2/lib


这个行为其实很奇怪,在我们的环境里,有的提交任务的机器不需要添加这个配置,有的不加这个配置就会造成那个main class找不到的问题。


Ps: 造成main 
class找不到的原因还可能是程序依赖的版本和部署的flink版本不一致,这种情况可能发生在flink依赖升级之后,部署的flink没有更新或者没有完全更新


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


在2021年02月23日 22:36,m183 写道:
你是指提交时所依赖的flink-dist jar包需要是 1.12 版本吗,现在改成1.12 版本还是不行

2021年2月23日 下午9:27,LakeShen  写道:

这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录



凌战  于2021年2月23日周二 下午7:33写道:

同提交作业到On Yarn集群,客户端的错误也是


org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1610671284452_0243 failed
10 times due to AM Container for appattempt_1610671284452_0243_10
exited with  exitCode: 1
Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from
container-launch.
Container id: container_e48_1610671284452_0243_10_01
Exit code: 1


[2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


[2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


Yarn那边的日志显示:Could not find or load main class
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint


不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 18:46,LakeShen 写道:
Hi 社区,

最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)

at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at

org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at

org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at

org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at

com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)

at

java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)

at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)

... 11 more

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.

at

org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)

at

org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)

at

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)

at

org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at

org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)

... 22 more

Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application

回复: Flink 1.12 On Yarn 作业提交失败问题

2021-02-24 Thread
可以试试在flink-conf.yaml里添加如下配置:
yarn.flink-dist-jar: /opt/flink-1.12/lib/flink-dist_2.11-1.12.0.jar
yarn.ship-files: /data/dfl2/lib


这个行为其实很奇怪,在我们的环境里,有的提交任务的机器不需要添加这个配置,有的不加这个配置就会造成那个main class找不到的问题。


Ps: 造成main 
class找不到的原因还可能是程序依赖的版本和部署的flink版本不一致,这种情况可能发生在flink依赖升级之后,部署的flink没有更新或者没有完全更新


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


在2021年02月23日 22:36,m183 写道:
你是指提交时所依赖的flink-dist jar包需要是 1.12 版本吗,现在改成1.12 版本还是不行

2021年2月23日 下午9:27,LakeShen  写道:

这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录



凌战  于2021年2月23日周二 下午7:33写道:

同提交作业到On Yarn集群,客户端的错误也是


org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1610671284452_0243 failed
10 times due to AM Container for appattempt_1610671284452_0243_10
exited with  exitCode: 1
Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from
container-launch.
Container id: container_e48_1610671284452_0243_10_01
Exit code: 1


[2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


[2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


Yarn那边的日志显示:Could not find or load main class
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint


不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 18:46,LakeShen 写道:
Hi 社区,

最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)

at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at

org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at

org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at

org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at

com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)

at

java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)

at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)

... 11 more

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.

at

org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)

at

org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)

at

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)

at

org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at

org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)

... 22 more

Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1613992328588_4441 failed 2
times due to AM Container for appattempt_1613992328588_4441_02 exited
with  exitCode: 1
Diagnostics: Exception from

Re:Row function cannot have column reference through table alias

2021-01-10 Thread
9E8 cpu, 9.18E9 io, 0.0 network, 0.0 
memory}",color=blue,shape=box]
rel312 
[label="rel#312:StreamExecJoin\nleft=RelSubset#309,right=RelSubset#311,joinType=InnerJoin,where=true,select=app_id,
 id, family,leftInputSpec=NoUniqueKey,rightInputSpec=HasUniqueKey\nrows=9.0E15, 
cost={inf}",shape=box]
subset297 [label="rel#297:RelSubset#15.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset313 [label="rel#313:RelSubset#15.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE]"]
}
subgraph cluster16{
label="Set 16 RecordType(VARCHAR(2147483647) id, RecordType(VARCHAR(2147483647) 
EXPR$0, VARCHAR(2147483647) EXPR$1) EXPR$1)";
rel298 [label="rel#298:FlinkLogicalCalc\ninput=RelSubset#297,select=id, 
ROW(family.app_id, family.message) AS EXPR$1\nrows=9.0E15, cost={9.0038E15 
rows, 9.0039E15 cpu, 9.18E9 io, 0.0 network, 0.0 
memory}",color=blue,shape=box]
rel314 [label="rel#314:StreamExecCalc\ninput=RelSubset#313,select=id, 
ROW(family.app_id, family.message) AS EXPR$1\nrows=9.0E15, 
cost={inf}",shape=box]
subset299 [label="rel#299:RelSubset#16.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset315 [label="rel#315:RelSubset#16.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE]"]
}
subgraph cluster17{
label="Set 17 RecordType(VARCHAR(2147483647) id, RecordType(VARCHAR(2147483647) 
EXPR$0, VARCHAR(2147483647) EXPR$1) EXPR$1)";
rel301 
[label="rel#301:FlinkLogicalSink\ninput=RelSubset#299,table=default_catalog.default_database.flink_log_sink,fields=id,
 EXPR$1\nrows=9.0E15, cost={1.80038E16 rows, 1.80039E16 cpu, 9.18E9 io, 
0.0 network, 0.0 memory}",color=blue,shape=box]
rel304 
[label="rel#304:AbstractConverter\ninput=RelSubset#302,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None:
 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=9.0E15, 
cost={inf}",shape=box]
rel316 
[label="rel#316:StreamExecSink\ninput=RelSubset#315,table=default_catalog.default_database.flink_log_sink,fields=id,
 EXPR$1\nrows=9.0E15, cost={inf}",shape=box]
subset302 [label="rel#302:RelSubset#17.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset303 [label="rel#303:RelSubset#17.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE]"]
}
root -> subset303;
subset292 -> rel177[color=blue];
subset306 -> rel305[color=blue];
subset294 -> rel293[color=blue]; rel293 -> subset292[color=blue];
subset308 -> rel307[color=blue]; rel307 -> subset306[color=blue];
subset309 -> rel310; rel310 -> subset308;
subset309 -> rel318[color=blue]; rel318 -> subset308[color=blue];
subset295 -> rel181[color=blue];
subset297 -> rel296[color=blue]; rel296 -> subset294[color=blue,label="0"]; 
rel296 -> subset295[color=blue,label="1"];
subset313 -> rel312; rel312 -> subset309[label="0"]; rel312 -> 
subset311[label="1"];
subset299 -> rel298[color=blue]; rel298 -> subset297[color=blue];
subset315 -> rel314; rel314 -> subset313;
subset302 -> rel301[color=blue]; rel301 -> subset299[color=blue];
subset303 -> rel304; rel304 -> subset302;
subset303 -> rel316; rel316 -> subset315;
}
at 
org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:742)
at 
org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:365)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:520)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
... 32 more


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


On 01/11/2021 11:04,马阳阳 wrote:
We have a sql that compose a row with a table’s columns. The simplified sql is 
like:
INSERT INTO flink_log_sink
SELECT
b.id,
Row(b.app_id, b.message)
FROM flink_log_source a
join flink_log_side b
on a.id = b.id;


When we submit the sql to Flink, the sql cannot be parsed, with the following 
error message:
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"." at line 11, column 8.
Was expecting one of:
")" ...
"," ...

at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at 
cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35)
at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172)
at cn.imdada.bi.dfl2.core.Main.main(Main.java:125)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgr

Row function cannot have column reference through table alias

2021-01-10 Thread
We have a sql that compose a row with a table’s columns. The simplified sql is 
like:
INSERT INTO flink_log_sink
SELECT
b.id,
Row(b.app_id, b.message)
FROM flink_log_source a 
join flink_log_side b 
on a.id = b.id;


When we submit the sql to Flink, the sql cannot be parsed, with the following 
error message:
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"." at line 11, column 8.
Was expecting one of:
")" ...
"," ...

at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at 
cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35)
at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172)
at cn.imdada.bi.dfl2.core.Main.main(Main.java:125)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112)
at 
cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37)
at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at 
line 11, column 8.
Was expecting one of:
")" ...
"," ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
... 15 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at 
line 11, column 8.
Was expecting one of:
")" ...
"," ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 17 more


Is this a bug or the expected behavior? If this is the expected behavior, what 
can we do to avoid it? 


PS:
I tried to create a view to represent the join result,  and inserted the view 
into the sink table. Unfortunately, it didn’t work neither.
| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制



Row function cannot have column reference through table alias

2021-01-10 Thread
We have a sql that compose a row with a table’s columns. The simplified sql is 
like:
INSERT INTO flink_log_sink
SELECT
b.id,
Row(b.app_id, b.message)
FROM flink_log_source a 
join flink_log_side b 
on a.id = b.id;


When we submit the sql to Flink, the sql cannot be parsed, with the following 
error message:
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"." at line 11, column 8.
Was expecting one of:
")" ...
"," ...

at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at 
cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35)
at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172)
at cn.imdada.bi.dfl2.core.Main.main(Main.java:125)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112)
at 
cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37)
at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at 
line 11, column 8.
Was expecting one of:
")" ...
"," ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
... 15 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at 
line 11, column 8.
Was expecting one of:
")" ...
"," ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 17 more


Is this a bug or the expected behavior? If this is the expected behavior, what 
can we do to avoid it? 


PS:
I tried to create a view to represent the join result,  and inserted the view 
into the sink table. Unfortunately, it didn’t work neither.
| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制



Re:Re:Re:Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread
Hi Yang,
After I copied the logic from `YarnLogConfigUtil` to my own deployer (maybe 
call its logic instead of copying is a better option), the logs now can show 
normally.


Thanks again for the kind help.







At 2020-11-16 17:28:47, "马阳阳"  wrote:

Hi Yang,
I checked the `YarnLogConfigUtil`, it does some work to set the configuration 
for log.
Should I copy the logic to my deployer?










At 2020-11-16 17:21:07, "马阳阳"  wrote:

Hi Yang,
Thank you for you reply.


I set the value for "$internal.deployment.config-dir" to the Flink 
configuration directory.
And the configuration showed on Flink web UI. But it still not work. So I 
wonder what should 
I set as the value for "$internal.deployment.config-dir"?







At 2020-11-16 16:43:11, "Yang Wang"  wrote:

If you are using your own deployer(aka a java program calls the Flink client 
API to submit Flink jobs),
you need to check the jobmanager configuration in webUI whether 
"$internal.yarn.log-config-file" 
is correctly set. If not, maybe you need to set 
"$internal.deployment.config-dir" in your deployer,
not simply set the FLINK_CONF_DIR environment. Because your deployer needs to 
do some configuration setting
which CliFrontend has done. Please have a try and share more feedback.




Best,
Yang


马阳阳  于2020年11月16日周一 下午2:47写道:

Hi Yang,
We run a self-compiled Flink-1.12-SNAPSHOT, and could not see any 
taskmanager/jobmanager logs.
I have checked the log4j.properties file, and it's in the right format. And the 
FLINK_CONF_DIR is set.
When checking the java dynamic options of task manager, I found that the log 
related options are not
set.
This is the output when ussuing "ps -ef | grep ".


yarn 31049 30974  9 13:57 ?00:03:31 
/usr/lib/jvm/jdk1.8.0_121/bin/java -Xmx536870902 -Xms536870902 
-XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=134217730b -D 
taskmanager.memory.network.min=134217730b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=402653174b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=dhpdn09-113 
-Dtaskmanager.resource-id=container_1604585185669_635512_01_000713 -Dweb.port=0 
-Dweb.tmpdir=/tmp/flink-web-1d373ec2-0cbe-49b8-9592-3ac1d207ad63 
-Djobmanager.rpc.port=40093 -Drest.address=dhpdn09-113


My question is, what maybe the problem for this? And any suggestions?


By the way, we submit the program from Java program instead of from the command 
line.


Thanks.


ps: I sent the mail to spark user mail list un-attentionally. So I resent it to 
the Flink user mail list. Sorry for the inconvenience to  @Yang Wang 
















At 2020-11-03 20:56:19, "Yang Wang"  wrote:

You could issue "ps -ef | grep container_id_for_some_tm". And then you will 
find the
following java options about log4j.


-Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties



Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午11:37写道:

Sure. I will check that and get back to you. could you please share how to 
check java dynamic options?


Best,
Diwakar


On Mon, Nov 2, 2020 at 1:33 AM Yang Wang  wrote:

If you have already updated the log4j.properties, and it still could not work, 
then I
suggest to log in the Yarn NodeManager machine and check the log4j.properties
in the container workdir is correct. Also you could have a look at the java 
dynamic
options are correctly set.


I think it should work if the log4j.properties and java dynamic options are set 
correctly.


BTW, could you share the new yarn logs?


Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午4:32写道:





Hi Yang,


Thank you so much for taking a look at the log files. I changed my 
log4j.properties. Below is the actual file that I got from EMR 6.1.0 
distribution of flink 1.11. I observed that it is different from Flink 1.11 
that i downloaded so i changed it. Still I didn't see any logs.


Actual
log4j.rootLogger=INFO,file

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file





modified : commented the above and added new logging from actual flink 
application log4.properties file


#log4j.rootLogger=INFO,file

# Log all infos in the given file
#log4j.appender.file=o

Re:Re:Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread
Hi Yang,
I checked the `YarnLogConfigUtil`, it does some work to set the configuration 
for log.
Should I copy the logic to my deployer?










At 2020-11-16 17:21:07, "马阳阳"  wrote:

Hi Yang,
Thank you for you reply.


I set the value for "$internal.deployment.config-dir" to the Flink 
configuration directory.
And the configuration showed on Flink web UI. But it still not work. So I 
wonder what should 
I set as the value for "$internal.deployment.config-dir"?







At 2020-11-16 16:43:11, "Yang Wang"  wrote:

If you are using your own deployer(aka a java program calls the Flink client 
API to submit Flink jobs),
you need to check the jobmanager configuration in webUI whether 
"$internal.yarn.log-config-file" 
is correctly set. If not, maybe you need to set 
"$internal.deployment.config-dir" in your deployer,
not simply set the FLINK_CONF_DIR environment. Because your deployer needs to 
do some configuration setting
which CliFrontend has done. Please have a try and share more feedback.




Best,
Yang


马阳阳  于2020年11月16日周一 下午2:47写道:

Hi Yang,
We run a self-compiled Flink-1.12-SNAPSHOT, and could not see any 
taskmanager/jobmanager logs.
I have checked the log4j.properties file, and it's in the right format. And the 
FLINK_CONF_DIR is set.
When checking the java dynamic options of task manager, I found that the log 
related options are not
set.
This is the output when ussuing "ps -ef | grep ".


yarn 31049 30974  9 13:57 ?00:03:31 
/usr/lib/jvm/jdk1.8.0_121/bin/java -Xmx536870902 -Xms536870902 
-XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=134217730b -D 
taskmanager.memory.network.min=134217730b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=402653174b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=dhpdn09-113 
-Dtaskmanager.resource-id=container_1604585185669_635512_01_000713 -Dweb.port=0 
-Dweb.tmpdir=/tmp/flink-web-1d373ec2-0cbe-49b8-9592-3ac1d207ad63 
-Djobmanager.rpc.port=40093 -Drest.address=dhpdn09-113


My question is, what maybe the problem for this? And any suggestions?


By the way, we submit the program from Java program instead of from the command 
line.


Thanks.


ps: I sent the mail to spark user mail list un-attentionally. So I resent it to 
the Flink user mail list. Sorry for the inconvenience to  @Yang Wang 
















At 2020-11-03 20:56:19, "Yang Wang"  wrote:

You could issue "ps -ef | grep container_id_for_some_tm". And then you will 
find the
following java options about log4j.


-Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties



Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午11:37写道:

Sure. I will check that and get back to you. could you please share how to 
check java dynamic options?


Best,
Diwakar


On Mon, Nov 2, 2020 at 1:33 AM Yang Wang  wrote:

If you have already updated the log4j.properties, and it still could not work, 
then I
suggest to log in the Yarn NodeManager machine and check the log4j.properties
in the container workdir is correct. Also you could have a look at the java 
dynamic
options are correctly set.


I think it should work if the log4j.properties and java dynamic options are set 
correctly.


BTW, could you share the new yarn logs?


Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午4:32写道:





Hi Yang,


Thank you so much for taking a look at the log files. I changed my 
log4j.properties. Below is the actual file that I got from EMR 6.1.0 
distribution of flink 1.11. I observed that it is different from Flink 1.11 
that i downloaded so i changed it. Still I didn't see any logs.


Actual
log4j.rootLogger=INFO,file

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file





modified : commented the above and added new logging from actual flink 
application log4.properties file


#log4j.rootLogger=INFO,file

# Log all infos in the given file
#log4j.appender.file=org.apache.log4j.FileAppender
#log4j.appender.file.file=${log.file}
#log4j.appender.file.append=false
#log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 

Re:Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread
Hi Yang,
Thank you for you reply.


I set the value for "$internal.deployment.config-dir" to the Flink 
configuration directory.
And the configuration showed on Flink web UI. But it still not work. So I 
wonder what should 
I set as the value for "$internal.deployment.config-dir"?







At 2020-11-16 16:43:11, "Yang Wang"  wrote:

If you are using your own deployer(aka a java program calls the Flink client 
API to submit Flink jobs),
you need to check the jobmanager configuration in webUI whether 
"$internal.yarn.log-config-file" 
is correctly set. If not, maybe you need to set 
"$internal.deployment.config-dir" in your deployer,
not simply set the FLINK_CONF_DIR environment. Because your deployer needs to 
do some configuration setting
which CliFrontend has done. Please have a try and share more feedback.




Best,
Yang


马阳阳  于2020年11月16日周一 下午2:47写道:

Hi Yang,
We run a self-compiled Flink-1.12-SNAPSHOT, and could not see any 
taskmanager/jobmanager logs.
I have checked the log4j.properties file, and it's in the right format. And the 
FLINK_CONF_DIR is set.
When checking the java dynamic options of task manager, I found that the log 
related options are not
set.
This is the output when ussuing "ps -ef | grep ".


yarn 31049 30974  9 13:57 ?00:03:31 
/usr/lib/jvm/jdk1.8.0_121/bin/java -Xmx536870902 -Xms536870902 
-XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=134217730b -D 
taskmanager.memory.network.min=134217730b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=402653174b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=dhpdn09-113 
-Dtaskmanager.resource-id=container_1604585185669_635512_01_000713 -Dweb.port=0 
-Dweb.tmpdir=/tmp/flink-web-1d373ec2-0cbe-49b8-9592-3ac1d207ad63 
-Djobmanager.rpc.port=40093 -Drest.address=dhpdn09-113


My question is, what maybe the problem for this? And any suggestions?


By the way, we submit the program from Java program instead of from the command 
line.


Thanks.


ps: I sent the mail to spark user mail list un-attentionally. So I resent it to 
the Flink user mail list. Sorry for the inconvenience to  @Yang Wang 
















At 2020-11-03 20:56:19, "Yang Wang"  wrote:

You could issue "ps -ef | grep container_id_for_some_tm". And then you will 
find the
following java options about log4j.


-Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties



Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午11:37写道:

Sure. I will check that and get back to you. could you please share how to 
check java dynamic options?


Best,
Diwakar


On Mon, Nov 2, 2020 at 1:33 AM Yang Wang  wrote:

If you have already updated the log4j.properties, and it still could not work, 
then I
suggest to log in the Yarn NodeManager machine and check the log4j.properties
in the container workdir is correct. Also you could have a look at the java 
dynamic
options are correctly set.


I think it should work if the log4j.properties and java dynamic options are set 
correctly.


BTW, could you share the new yarn logs?


Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午4:32写道:





Hi Yang,


Thank you so much for taking a look at the log files. I changed my 
log4j.properties. Below is the actual file that I got from EMR 6.1.0 
distribution of flink 1.11. I observed that it is different from Flink 1.11 
that i downloaded so i changed it. Still I didn't see any logs.


Actual
log4j.rootLogger=INFO,file

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file





modified : commented the above and added new logging from actual flink 
application log4.properties file


#log4j.rootLogger=INFO,file

# Log all infos in the given file
#log4j.appender.file=org.apache.log4j.FileAppender
#log4j.appender.file.file=${log.file}
#log4j.appender.file.append=false
#log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
#log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file

# This affects logging for both user code and Flink
roo

Re:Re: Flink 1.11 not showing logs

2020-11-15 Thread
Hi Yang,
We run a self-compiled Flink-1.12-SNAPSHOT, and could not see any 
taskmanager/jobmanager logs.
I have checked the log4j.properties file, and it's in the right format. And the 
FLINK_CONF_DIR is set.
When checking the java dynamic options of task manager, I found that the log 
related options are not
set.
This is the output when ussuing "ps -ef | grep ".


yarn 31049 30974  9 13:57 ?00:03:31 
/usr/lib/jvm/jdk1.8.0_121/bin/java -Xmx536870902 -Xms536870902 
-XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=134217730b -D 
taskmanager.memory.network.min=134217730b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=402653174b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=dhpdn09-113 
-Dtaskmanager.resource-id=container_1604585185669_635512_01_000713 -Dweb.port=0 
-Dweb.tmpdir=/tmp/flink-web-1d373ec2-0cbe-49b8-9592-3ac1d207ad63 
-Djobmanager.rpc.port=40093 -Drest.address=dhpdn09-113


My question is, what maybe the problem for this? And any suggestions?


By the way, we submit the program from Java program instead of from the command 
line.


Thanks.


ps: I sent the mail to spark user mail list un-attentionally. So I resent it to 
the Flink user mail list. Sorry for the inconvenience to  @Yang Wang 
















At 2020-11-03 20:56:19, "Yang Wang"  wrote:

You could issue "ps -ef | grep container_id_for_some_tm". And then you will 
find the
following java options about log4j.


-Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties



Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午11:37写道:

Sure. I will check that and get back to you. could you please share how to 
check java dynamic options?


Best,
Diwakar


On Mon, Nov 2, 2020 at 1:33 AM Yang Wang  wrote:

If you have already updated the log4j.properties, and it still could not work, 
then I
suggest to log in the Yarn NodeManager machine and check the log4j.properties
in the container workdir is correct. Also you could have a look at the java 
dynamic
options are correctly set.


I think it should work if the log4j.properties and java dynamic options are set 
correctly.


BTW, could you share the new yarn logs?


Best,
Yang


Diwakar Jha  于2020年11月2日周一 下午4:32写道:





Hi Yang,


Thank you so much for taking a look at the log files. I changed my 
log4j.properties. Below is the actual file that I got from EMR 6.1.0 
distribution of flink 1.11. I observed that it is different from Flink 1.11 
that i downloaded so i changed it. Still I didn't see any logs.


Actual
log4j.rootLogger=INFO,file

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file





modified : commented the above and added new logging from actual flink 
application log4.properties file


#log4j.rootLogger=INFO,file

# Log all infos in the given file
#log4j.appender.file=org.apache.log4j.FileAppender
#log4j.appender.file.file=${log.file}
#log4j.appender.file.append=false
#log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
#log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file

# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = File
appender.main.append = false
appender.main.fileName = ${sys:log.file}
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant 

Re:Re: Re:flinksql 输出到kafka用的fixed的方式 结果都输出到一个topic分区了,这个是什么原因

2020-11-10 Thread



这个1.11.0和1.11.1的bug,已经在1.11.2里修复了,可以看下[1]这个issue

[1] 
https://issues.apache.org/jira/browse/FLINK-19285?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22kafka%20partitioner%22














在 2020-11-10 22:26:09,"leiyanrui" <1150693...@qq.com> 写道:
>我的topic分区数是10个,sink的并发是25个 ,按照取余计算的话 也不应该只输出到一个partition的
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:flink1.9 on yarn 消费kafka数据中文乱码

2020-06-08 Thread



我们也遇到过这个问题,我们当时遇到的问题是YARN NM上的默认charset是ascii。
通过在flink-conf.yaml文件里添加如下配置解决了该问题:
env.java.opts.taskmanager: "-Dfile.encoding=UTF-8"














在 2020-06-08 21:48:33,"guanyq"  写道:
>kafka 0.11版本
>首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题
>1.本地idea debug运行,无中文乱码问题
>2.服务器Standalone模式运行,无中文乱码问题
>3.服务器on yarn提交方式,就出现中文乱码问题
>
>
>flink 消费kafka的api用的是这个
>new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props);
>
>
>根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。


Re: flink1.10 yarn模式无法提交作业

2020-02-09 Thread
我们遇到过因为jar包冲突导致task 
manager无法启动,从而一直处于accepted状态的情况。这种情况下可以使用yarn logs 
-applicationId 查看一下日志,可以发现更多线索。


Fei Han wrote on 2020/2/9 17:34:

@all:
在Flink1.10中,用yarn模式无法提交作业。
   提示如下:
lease check if the requested resources are available in the YARN cluster
2020-02-09 17:22:26,318 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deployment took more than 60 seconds. Please check if the 
requested resources are available in the YARN cluster
2020-02-09 17:22:26,570 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deployment took more than 60 seconds. Please check if the 
requested resources are available in the YARN cluster
2020-02-09 17:22:26,822 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deployment took more than 60 seconds. Please check if the 
requested resources are available in the YARN cluster
2020-02-09 17:22:27,074 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deployment took more than 60 seconds. Please check if the 
requested resources are available in the YARN cluster
2020-02-09 17:22:27,326 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deployment took more than 60 seconds. Please check if the 
requested resources are available in the YARN cluster
2020-02-09 17:22:27,578 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deployment took more than 60 seconds. Please check if the 
requested resources are available in the YARN cluster
2020-02-09 17:22:27,831 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deployment took more than 60 seconds. Please check if the 
requested resources are available in the YARN cluster
2020-02-09 17:22:28,083 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deployment took more than 60 seconds. Please check if the 
requested resources are available in the YARN cluster
2020-02-09 17:22:28,336 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deployment took more than 60 seconds. Please check if the 
requested resources are available in the YARN cluster


另外,我看了yarn资源足够。但就是提交不了。
我的作业提交命令如下:
  ./bin/flink run -m yarn-cluster  -yjm 1024  -ytm 2048  
/opt/flink-1.10/examples/batch/WordCount.jar  --input 
hdfs://192.168.xxx.xxx:9000/test/LICENSE  --output 
hdfs://192.168.xxx.xxx:9000/test/result.txt


--
Sent from Postbox 


Fwd: Is it possible to get Flink job name in an operator?

2019-10-15 Thread
As the title. Is it possible now? Or if we can do something to achieve 
this. I tried to put the job name into the 
ExecutionConfig.GlobalJobParameters. But it is not possible to get the 
job name before Environment.execute() is called.


Best regards,
mayangyang