Container is running beyond physical memory limits. Current usage: 5.0 GB of 5 GB physical memory used; 7.0 GB of 25 GB virtual memory used. Killing container.

2021-03-30 文章 admin
java.lang.Exception: Container 
[pid=17248,containerID=container_1597847003686_12235_01_001336] is running 
beyond physical memory limits. Current usage: 5.0 GB of 5 GB physical memory 
used; 7.0 GB of 25 GB virtual memory used. Killing container.
Dump of the process-tree for container_1597847003686_12235_01_001336 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 17283 17248 17248 17248 (java) 1025867 190314 7372083200 1311496 
/usr/local/jdk1.8/bin/java -Xmx2147483611 -Xms2147483611 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 -server 
-XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly 
-XX:CMSInitiatingOccupancyFraction=75 -XX:ParallelGCThreads=4 
-XX:+AlwaysPreTouch -XX:NewRatio=1 -DjobName=fastmidu-deeplink-tuid-20200203 
-Dlog.file=/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1825361124b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=2013265883b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=di-h4-dn-134.h.ab1.qttsite.net -Dweb.port=0 
-Dweb.tmpdir=/tmp/flink-web-f63d543b-a75a-4dc4-be93-979eebd8062d 
-Djobmanager.rpc.port=43423 -Drest.address=di-h4-dn-134.h.ab1.qttsite.net 
|- 17248 17246 17248 17248 (bash) 0 0 116015104 353 /bin/bash -c 
/usr/local/jdk1.8/bin/java -Xmx2147483611 -Xms2147483611 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 -server 
-XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly 
-XX:CMSInitiatingOccupancyFraction=75 -XX:ParallelGCThreads=4 
-XX:+AlwaysPreTouch -XX:NewRatio=1 -DjobName=fastmidu-deeplink-tuid-20200203 
-Dlog.file=/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1825361124b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=2013265883b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address='di-h4-dn-134.h.ab1.qttsite.net' -Dweb.port='0' 
-Dweb.tmpdir='/tmp/flink-web-f63d543b-a75a-4dc4-be93-979eebd8062d' 
-Djobmanager.rpc.port='43423' -Drest.address='di-h4-dn-134.h.ab1.qttsite.net' 
1> 
/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.out
 2> 
/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.err
 

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:343)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Hi :

退订

2021-03-30 文章 Y Luo
退订


Re: flink-提交jar 隔断时间自己重启问题

2021-03-30 文章 yidan zhao
没看懂问题。任务自动重启?失败了自然就重启了,restart策略设置的吧。

valve <903689...@qq.com> 于2021年3月31日周三 上午11:31写道:

> 我也遇到这个问题 不知道为啥
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re:回复:flink 从mysql读取数据异常

2021-03-30 文章 Robin Zhang
Hi,air23
JDBCTableSource就是batch模式的,不走实时。Flink解析执行计划时内部会去判断。

Best






air23 wrote
> 这边是想离线读取。不是走实时的 
> 看到异常是 Only insert statement is supported now
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2021-03-30 10:31:51,"guoyb" <

> 861277329@

>> 写道:
>>可以读取的,还有内置flink cdc
>>select得用query方法,看看是不是用错了execute。
>>
>>
>>
>>---原始邮件---
>>发件人: "air23"

> wangfei23_job@

> gt;
> 发送时间: 2021年3月30日(周二) 上午10:25
>>收件人: "user-zh"

> user-zh@.apache

> gt;;
> 主题: flink 从mysql读取数据异常
>>
>>
>>你好 参考官网
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
>>这边读取mysql jdbc数据报错Exception in thread "main"
org.apache.flink.table.api.TableException: Only insert statement is
supported now.
>>
>>
>>String a = "-- register a MySQL table 'users' in Flink SQL\n" +
>>"CREATE TABLE MyUserTable (\n" +
>>" id BIGINT\n" +
>>") WITH (\n" +
>>" 'connector' = 'jdbc',\n" +
>>" 'url' = 'jdbc:mysql://***:3306/monitor',\n" +
>>" 'table-name' = 't1',\n" +
>>" 'username' = 'root',\n" +
>>" 'password' = '***'\n" +
>>") ";
>>
>>String b ="-- scan data from the JDBC table\n" +
>>"SELECT id FROM MyUserTable\n";
>>
>>tEnv.executeSql(a);
>>
>>
>>
>>请问是不可以从mysql读取数据吗?





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-提交jar 隔断时间自己重启问题

2021-03-30 文章 valve
我也遇到这个问题 不知道为啥



--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-03-30 文章 张保淇
退订

Checkpoint Aligned问题

2021-03-30 文章 张韩



flinkSQL + pythonUDF问题

2021-03-30 文章 guaishushu1103
任务运行一段时间出现Apache beam问题 有哪位大佬能帮忙看看:
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 3134: Traceback (most recent call last): File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
 line 421, in input_elements element = received.get(timeout=1) File 
"/home/yarn/software/python/lib/python3.6/queue.py", line 172, in get raise 
Empty queue.Empty During handling of the above exception, another exception 
occurred: Traceback (most recent call last): File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 253, in _execute response = task() File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 310, in  lambda: self.create_worker().do_instruction(request), 
request) File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 480, in do_instruction getattr(request, request_type), 
request.instruction_id) File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) 
File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 967, in process_bundle expected_inputs): File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
 line 424, in input_elements raise RuntimeError('Channel closed prematurely.') 
RuntimeError: Channel closed prematurely.
guaishushu1...@163.com

Flink 写ORC失败

2021-03-30 文章 Jacob
使用Flink API消费kafka消息,写orc文件,报错如下
Caused by: org.apache.flink.util.SerializedThrowable
at java.lang.System.arraycopy(Native Method) ~[?:1.8.0_191-ojdkbuild]
at org.apache.hadoop.io.Text.set(Text.java:225) ~[test456.jar:?]
at 
org.apache.orc.impl.StringRedBlackTree.add(StringRedBlackTree.java:59)
~[test456.jar:?]
at
org.apache.orc.impl.writer.StringTreeWriter.writeBatch(StringTreeWriter.java:70)
~[test456.jar:?]
at
org.apache.orc.impl.writer.MapTreeWriter.writeBatch(MapTreeWriter.java:104)
~[test456.jar:?]
at
org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56)
~[test456.jar:?]
at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557)
~[test456.jar:?]
at 
org.apache.flink.orc.writer.OrcBulkWriter.flush(OrcBulkWriter.java:66)
~[test456.jar:?]
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:59)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:226)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:259)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:240)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:245)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:236)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(StreamingFileSinkHelper.java:86)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:415)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
... 13 more


大概原因应该是写一个map类型的数据时候出错,但不知道具体是哪个地方的错误

看到一个相似的错误
https://stackoverflow.com/questions/55246512/error-writing-to-orcnewoutputformat-using-mapr-multipleoutputs

不太清楚这个错误时什么原因所致
已知数据不为空,不为null



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


pyflink1.12 报错:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0

2021-03-30 文章 xiaoyue
在执行 pyflink UDAF 
脚本时报错:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalStateException: Process died with exit code 0。 
目前udaf计算的结果,无法sink, 不知路过的大佬,是否也遇到过这个问题?
异常信息如下:
Traceback (most recent call last):
  File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", 
line 114, in 
csv_source_udaf(csv_source)
  File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", 
line 45, in wrapper
func(*args, **kw)
  File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", 
line 103, in csv_source_udaf
print(result.to_pandas())
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\table\table.py", 
line 808, in to_pandas
if batches.hasNext():
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\java_gateway.py", line 
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\util\exceptions.py", 
line 147, in deco
return f(*a, **kw)
  File "C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o101.hasNext.
: java.lang.RuntimeException: Failed to fetch next result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
at 
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
at 
org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644)
at 
org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to fetch job execution result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
... 16 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172)
... 18 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at 

Re: 相同的作业配置 ,Flink1.12 版本的作业checkpoint耗时增加以及制作失败,Flink1.9的作业运行正常

2021-03-30 文章 Yingjie Cao
这个应该不是FLINK-16404
的影响,那个对checkpoint时间的影响比较小,是已经有一个benchmark测试的,1s的checkpoint
interval也没什么大问题,我建议可以看一下失败的task的stack,看一下在干什么,可能排查问题更快一些。

Haihang Jing  于2021年3月24日周三 下午12:06写道:

> 【现象】相同配置的作业(checkpoint interval :3分钟,作业逻辑:regular
> join),flink1.9运行正常,flink1.12运行一段时间后,checkpoint制作耗时增大,最后checkpoint制作失败。
>
>
> 【分析】了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
> feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量,因此调整checkpoint
> interval为10分钟进行对比测试,发现调整后(interval为10),flink1.12上运行的作业运行正常。
> 相关issue:https://issues.apache.org/jira/browse/FLINK-16404
>
> 【问题】1.想咨询下大家有遇到过相同的情况么?
> 2.flink1.12的作业checkpoint间隔对作业的影响具体有多大?官方有测试么?
>
> checkpoint interval为3分钟的flink1.12作业运行5小时后,checkpoint制作失败,具体异常栈:
>
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:96)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1924)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1897)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2038)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>


Re: flink sql count distonct 优化

2021-03-30 文章 Robin Zhang
Hi,guomuhua
   `The number of inputs accumulated by local aggregation every time is
based on mini-batch interval. It means local-global aggregation depends on
mini-batch optimization is enabled `
,关于本地聚合,官网有这么一段话,也就是说,需要先开启批次聚合,然后才能使用本地聚合,加起来有三个参数.
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
 看你提问时只是开启了本地聚合一个参数,不知道是不是没写全。
Best,
Robin




guomuhua wrote
> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
>  set
> table.optimizer.distinct-agg.split.bucket-num=1024;
> 还需要对应的将SQL改写为两段式吗?
> 例如:
> 原SQL:
> SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> 
> 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> SELECT day, SUM(cnt) total
> FROM (
> SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> FROM T GROUP BY day, MOD(buy_id, 1024))
> GROUP BY day
> 
> 还是flink会帮我自动改写SQL,我不用关心?
> 
> 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png;
>  
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql count distonct 优化

2021-03-30 文章 Robin Zhang
Hi,Jark
   我理解疑问中的sql是一个普通的agg操作,只不过分组的键是时间字段,不知道您说的 `我看你的作业里面是window agg`
,这个怎么理解

Best,
Robin



Jark wrote
>> 如果不是window agg,开启参数后flink会自动打散是吧
> 是的
> 
>> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
> 文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。
> 
> Best,
> Jark
> 
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> 
> On Fri, 26 Mar 2021 at 11:00, guomuhua <

> 663021157@

>> wrote:
> 
>> Jark wrote
>> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
>> > agg支持这个参数了。可以期待下。
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang 
>>
>> > vincent2015qdlg@
>>
>> > 
>> > wrote:
>> >
>> >> Hi,guomuhua
>> >>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>> >>
>> >> Best,
>> >> Robin
>> >>
>> >>
>> >> guomuhua wrote
>> >> > 在SQL中,如果开启了 local-global 参数:set
>> >> > table.optimizer.agg-phase-strategy=TWO_PHASE;
>> >> > 或者开启了Partial-Final 参数:set
>> >> table.optimizer.distinct-agg.split.enabled=true;
>> >> >  set
>> >> > table.optimizer.distinct-agg.split.bucket-num=1024;
>> >> > 还需要对应的将SQL改写为两段式吗?
>> >> > 例如:
>> >> > 原SQL:
>> >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>> >> >
>> >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>> >> > SELECT day, SUM(cnt) total
>> >> > FROM (
>> >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>> >> > FROM T GROUP BY day, MOD(buy_id, 1024))
>> >> > GROUP BY day
>> >> >
>> >> > 还是flink会帮我自动改写SQL,我不用关心?
>> >> >
>> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
>> >> > 
>> >>
>> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png
>> ;
>> >>
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
>> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 从mysql读取数据异常

2021-03-30 文章 张锴
报错 信息明确说了只支持insert

air23  于2021年3月30日周二 上午10:32写道:

> 你好 参考官网
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
> 这边读取mysql jdbc数据报错Exception in thread "main"
> org.apache.flink.table.api.TableException: Only insert statement is
> supported now.
>
>
> String  a = "-- register a MySQL table 'users' in Flink SQL\n" +
> "CREATE TABLE MyUserTable (\n" +
> "  id BIGINT\n" +
> ") WITH (\n" +
> "   'connector' = 'jdbc',\n" +
> "   'url' = 'jdbc:mysql://***:3306/monitor',\n" +
> "   'table-name' = 't1',\n" +
> "   'username' = 'root',\n" +
> "   'password' = '***'\n" +
> ") ";
>
> String b ="-- scan data from the JDBC table\n" +
> "SELECT id FROM MyUserTable\n";
>
> tEnv.executeSql(a);
>
>
>
> 请问是不可以从mysql读取数据吗?
>
>
>
>
>


(无主题)

2021-03-30 文章 高耀军
退订


| |
高耀军
|
|
邮箱:18221112...@163.com
|

签名由 网易邮箱大师 定制

退订

2021-03-30 文章 徐永健
退订