从 savepoint 中恢复应用时发生 KafkaException: Unexpected error in InitProducerIdResponse

2020-11-16 文章 chang chan
 这个程序用于测试 flink kafka exactly once, 如果普通提交可以正常运行, 但如果从 savepoint 中恢复就会报下面的错误
kafka server 端, 配置了 transaction.max.timeout.ms = 360 client producer 端
配置了 transaction.timeout.ms = 90

参考代码:

https://gist.github.com/giraffe-tree/15c5f707d9dfe3221959ae37b4e9d786
2020-11-17 15:24:51
org.apache.kafka.common.KafkaException: Unexpected error in
InitProducerIdResponse; Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at org.apache.kafka.clients.producer.internals.
TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
.java:1370)
at org.apache.kafka.clients.producer.internals.
TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278
)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(
NetworkClient.java:566)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558)
at org.apache.kafka.clients.producer.internals.Sender
.maybeSendAndPollTransactionalRequest(Sender.java:415)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
.java:313)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
240)
at java.lang.Thread.run(Thread.java:748)

查阅了 google 上的相关资料, 但是仍然无法解决, 有人遇到过类似的问题? 或者能提供排查思路么?


java.lang.IncompatibleClassChangeError: Implementing class (using blink-planner)

2020-11-16 文章 norman
Issue when integrate with hive 2.1.1

Exception in thread "main" java.lang.IncompatibleClassChangeError:
Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at
org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:112)
at
org.apache.flink.table.planner.delegation.StreamPlanner.(StreamPlanner.scala:48)
at
org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:289)
at
org.apache.flink.table.api.bridge.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:462)


code is straighforward:
 val bs =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(env, bs)


tEnv.registerCatalog(catalog, hive)
tEnv.useCatalog(catalog)

tEnv.executeSql(
  """SET table.sql-dialect=hive;
|CREATE TABLE wap_nohe_2 (
|  user_id STRING,
|  order_amount DOUBLE
|) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
TBLPROPERTIES (
|  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
|  'sink.partition-commit.trigger'='partition-time',
|  'sink.partition-commit.delay'='1 h',
|  'sink.partition-commit.policy.kind'='metastore,success-file'
|);
|""".stripMargin)


build.sbt is following:

  "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion %
"provided",
 //"org.apache.flink" %% "flink-table-runtime-blink" % flinkVersion %
"provided",
  "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion %
"provided",
  //"org.apache.flink" %% "flink-table-api-java-bridge" % flinkVersion %
"provided",

  "org.apache.flink" %% "flink-connector-jdbc" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-hive" % flinkVersion % "provided",
  "org.apache.hive" % "hive-exec" % hiveVersion,
  "org.postgresql" % "postgresql" % "42.2.18",

 "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "provided",




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: flink-1.11.2 的 内存溢出问题

2020-11-16 文章 Xintong Song
理论上一个 TM 可以拆分成多少 slot 并没有硬性的限制,但是并不是说并发越大,性能就一定越好。
增大并发,会增加作业对内存的需求。TM 上的 slot 数量过多时,可能会造成 GC 压力大、网络内存不足、OOM 等情况。另外,同一个 TM 上的
slot 多了,运行的 task 多了,也会给框架造成一定的压力。
建议先观察一下  TM 的 cpu 使用情况,如果作业确实存在处理性能不足(延迟增大、存在反压)同时 TM container 的 cpu
(多核)利用率上不去,再考虑调大并发。

Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 10:43 AM 史 正超  wrote:

> 谢谢 Xintong
> 大神回复,看了你很多视频。顺便请教个问题,slot的内存有最小的限制吗?我想用有限的资源情况下,把taskmanager的内存slot拆分成最小,以此来达到最大并发。这种拆分有没有一个合理的范围。
> 比如 1 个TM,8G, 那它拆分的最小slot数量 有没有一个限制。
> 
> 发件人: Xintong Song 
> 发送时间: 2020年11月17日 1:53
> 收件人: user-zh 
> 主题: Re: 回复: flink-1.11.2 的 内存溢出问题
>
> >
> > 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> >
>
> 不可以的,这个是集群配置。
>
> 可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Nov 17, 2020 at 9:31 AM Andrew <874269...@qq.com> wrote:
>
> > 应该是不可以这样配置的, 通过配置文件;
> > taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
> >
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> > 这种属于任务运行时配置!
> >
> >
> >
> > -- 原始邮件 --
> > 发件人:
> >   "user-zh"
> > <
> > shizhengc...@outlook.com>;
> > 发送时间: 2020年11月16日(星期一) 晚上7:14
> > 收件人: "user-zh@flink.apache.org" >
> > 主题: 回复: flink-1.11.2 的 内存溢出问题
> >
> >
> >
> > 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过
> > 下面代码动态设置吗?
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> >
> > 
> > 发件人: Xintong Song  > 发送时间: 2020年11月16日 10:59
> > 收件人: user-zh  > 主题: Re: flink-1.11.2 的 内存溢出问题
> >
> > 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> > 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> > 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Nov 16, 2020 at 6:38 PM 史 正超  wrote:
> >
> > > flink-on-yarn . per-job模式,重启是kafka的group.id
> > > 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> > > 
> > > 发件人: Xintong Song  > > 发送时间: 2020年11月16日 10:11
> > > 收件人: user-zh  > > 主题: Re: flink-1.11.2 的 内存溢出问题
> > >
> > > 是什么部署模式呢?standalone?
> > > 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Mon, Nov 16, 2020 at 5:53 PM 史 正超  > wrote:
> > >
> > > > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > > >
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> > > >
> > > > 2020-11-16 17:44:52
> > > > java.lang.OutOfMemoryError: Direct buffer memory. The direct
> > > out-of-memory
> > > > error has occurred. This can mean two things: either job(s)
> > require(s) a
> > > > larger size of JVM direct memory or there is a direct memory
> > leak. The
> > > > direct memory can be allocated by user code or some of its
> > dependencies.
> > > In
> > > > this case 'taskmanager.memory.task.off-heap.size' configuration
> > option
> > > > should be increased. Flink framework and its dependencies also
> > consume
> > > the
> > > > direct memory, mostly for network communication. The most of
> > network
> > > memory
> > > > is managed by Flink and should not result in out-of-memory
> > error. In
> > > > certain special cases, in particular for jobs with high
> > parallelism, the
> > > > framework may require more direct memory which is not managed
> by
> > Flink.
> > > In
> > > > this case 'taskmanager.memory.framework.off-heap.size'
> > configuration
> > > option
> > > > should be increased. If the error persists then there is
> > probably a
> > > direct
> > > > memory leak in user code or some of its dependencies which has
> > to be
> > > > investigated and fixed. The task executor has to be shutdown...
> > > > at
> > java.nio.Bits.reserveMemory(Bits.java:658)
> > > > at
> > java.nio.DirectByteBuffer. > > > at
> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> > > > at
> > sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> > > > at
> > sun.nio.ch.IOUtil.read(IOUtil.java:195)
> > > > at
> > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> > > > at
> > > >
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> > > > at
> > > >
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> > > > at
> > > >
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> > > > at
> > > >
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> > > > at
> > > >
> > >
> >
> org.apache.flink.kafka011.shaded

Re: flink keyedState 能不能存储泛型或抽象类型

2020-11-16 文章 Guowei Ma
可以的。不过你在声明MapStateDescriptor的时候要用 GenericTypeInfo了,并且会有一定的性能损失。

Best,
Guowei


On Tue, Nov 17, 2020 at 11:55 AM Lei Wang  wrote:

> 下面的业务逻辑
>
> robot 传感器上报的信息,先按 robotId keyBy,之后要遍历很多的规则。每个规则存储一个之前的对象,实现如下:
>
> private transient MapState state;
>
> for (Entry entry :
> RulesFactory.getChargerTwoRecordRules().entrySet()) {
> String faultName = entry.getKey();
> IChargerTwoRecordRule rule = entry.getValue();
> RobotData old = state.get(faultName);
> rule.handleLogMsg(old, current);
> }
>
> 现在有部分规则存储的对象不能用 RobotData 表示,有没有可能用类似泛型或继承的方式实现 MapState value 存储不同类型的数据呢?
>
>
> 比如
>
> MapState state;
>
> 之后根据不同的规则 把 Object 转换成具体的类
>
>
>
> 谢谢,
>
> 王磊
>


flink keyedState 能不能存储泛型或抽象类型

2020-11-16 文章 Lei Wang
下面的业务逻辑

robot 传感器上报的信息,先按 robotId keyBy,之后要遍历很多的规则。每个规则存储一个之前的对象,实现如下:

private transient MapState state;

for (Entry entry :
RulesFactory.getChargerTwoRecordRules().entrySet()) {
String faultName = entry.getKey();
IChargerTwoRecordRule rule = entry.getValue();
RobotData old = state.get(faultName);
rule.handleLogMsg(old, current);
}

现在有部分规则存储的对象不能用 RobotData 表示,有没有可能用类似泛型或继承的方式实现 MapState value 存储不同类型的数据呢?


比如

MapState state;

之后根据不同的规则 把 Object 转换成具体的类



谢谢,

王磊


Re: flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

2020-11-16 文章 Leonard Xu
Hi,
你确定是在Flink SQL 里使用 upsert 语法? 我理解是不支持的

另外你flink里声明connector DDL 中的主键应该和你在Mysql表的主键一致。

祝好
Leonard

> 在 2020年11月17日,09:12,鱼子酱 <384939...@qq.com> 写道:
> 
> upsert



Re: 使用flink1.11.1的debezium-changelog目前是否不支持Watermark

2020-11-16 文章 Jark Wu
是的。 目前还不支持。
1.12 版本会支持。
你定义 watermark 目的是什么呢?做 window 聚合?

On Tue, 17 Nov 2020 at 10:53, shimin huang 
wrote:

> 报错日志:
> ```
> Currently, defining WATERMARK on a changelog source is not supported
> ```
>


Re:Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

2020-11-16 文章 hailongwang
Hello,
   我使用 MySQLDialect 在本地确认了下,
   1. 在数据库需要建主键,因为建了主键 “INSERT INTO ... ON DUPLICATE KEY UPDATE”[1] 语句的 upsert 
语义才会生效。
   2. 需要在 DDL 中定义 'PRIMARY KEY',因为需要根据 ‘PRIMARY KEY’ 确认是否使用 'upsert query' [2]


[1] 
https://github.com/apache/flink/blob/7eb514a59f6fd117c3535ec4bebc40a375f30b63/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/MySQLDialect.java#L76
[2] 
https://github.com/apache/flink/blob/7eb514a59f6fd117c3535ec4bebc40a375f30b63/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatBuilder.java#L98





在 2020-11-17 09:28:14,"Tio Planto"  写道:
>需要建立mysql表request_date,terminal_no的联合主键,mysql的upsert是基于"INSERT INTO ... ON
>DUPLICATE KEY UPDATE..."实现的。
>ddl中可以不声明mysql主健。
>
>鱼子酱 <384939...@qq.com>于2020年11月17日 周二09:13写道:
>
>> 我写的是upsert呀。。。
>> insert into 我也测试了,也不行。
>>
>> 是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/


使用flink1.11.1的debezium-changelog目前是否不支持Watermark

2020-11-16 文章 shimin huang
报错日志:
```
Currently, defining WATERMARK on a changelog source is not supported
```


回复: 回复: flink-1.11.2 的 内存溢出问题

2020-11-16 文章 史 正超
谢谢 Xintong 
大神回复,看了你很多视频。顺便请教个问题,slot的内存有最小的限制吗?我想用有限的资源情况下,把taskmanager的内存slot拆分成最小,以此来达到最大并发。这种拆分有没有一个合理的范围。
比如 1 个TM,8G, 那它拆分的最小slot数量 有没有一个限制。

发件人: Xintong Song 
发送时间: 2020年11月17日 1:53
收件人: user-zh 
主题: Re: 回复: flink-1.11.2 的 内存溢出问题

>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>

不可以的,这个是集群配置。

可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。


Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 9:31 AM Andrew <874269...@qq.com> wrote:

> 应该是不可以这样配置的, 通过配置文件;
> taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
>
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
> 这种属于任务运行时配置!
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> shizhengc...@outlook.com>;
> 发送时间: 2020年11月16日(星期一) 晚上7:14
> 收件人: "user-zh@flink.apache.org"
> 主题: 回复: flink-1.11.2 的 内存溢出问题
>
>
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过
> 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>
> 
> 发件人: Xintong Song  发送时间: 2020年11月16日 10:59
> 收件人: user-zh  主题: Re: flink-1.11.2 的 内存溢出问题
>
> 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 6:38 PM 史 正超 
> > flink-on-yarn . per-job模式,重启是kafka的group.id
> > 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> > 
> > 发件人: Xintong Song  > 发送时间: 2020年11月16日 10:11
> > 收件人: user-zh  > 主题: Re: flink-1.11.2 的 内存溢出问题
> >
> > 是什么部署模式呢?standalone?
> > 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Nov 16, 2020 at 5:53 PM 史 正超  wrote:
> >
> > > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > >
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> > >
> > > 2020-11-16 17:44:52
> > > java.lang.OutOfMemoryError: Direct buffer memory. The direct
> > out-of-memory
> > > error has occurred. This can mean two things: either job(s)
> require(s) a
> > > larger size of JVM direct memory or there is a direct memory
> leak. The
> > > direct memory can be allocated by user code or some of its
> dependencies.
> > In
> > > this case 'taskmanager.memory.task.off-heap.size' configuration
> option
> > > should be increased. Flink framework and its dependencies also
> consume
> > the
> > > direct memory, mostly for network communication. The most of
> network
> > memory
> > > is managed by Flink and should not result in out-of-memory
> error. In
> > > certain special cases, in particular for jobs with high
> parallelism, the
> > > framework may require more direct memory which is not managed by
> Flink.
> > In
> > > this case 'taskmanager.memory.framework.off-heap.size'
> configuration
> > option
> > > should be increased. If the error persists then there is
> probably a
> > direct
> > > memory leak in user code or some of its dependencies which has
> to be
> > > investigated and fixed. The task executor has to be shutdown...
> > > at
> java.nio.Bits.reserveMemory(Bits.java:658)
> > > at
> java.nio.DirectByteBuffer. > > at
> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> > > at
> sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> > > at
> sun.nio.ch.IOUtil.read(IOUtil.java:195)
> > > at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> > > at
> > >
> >
> org.apache.flin

Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

2020-11-16 文章 Tio Planto
需要建立mysql表request_date,terminal_no的联合主键,mysql的upsert是基于"INSERT INTO ... ON
DUPLICATE KEY UPDATE..."实现的。
ddl中可以不声明mysql主健。

鱼子酱 <384939...@qq.com>于2020年11月17日 周二09:13写道:

> 我写的是upsert呀。。。
> insert into 我也测试了,也不行。
>
> 是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


回复: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

2020-11-16 文章 史 正超
你的sql里用的是 Tumble窗口,不是一个回撤流,不会有更新的,只有insert

发件人: 鱼子酱 <384939...@qq.com>
发送时间: 2020年11月17日 1:12
收件人: user-zh@flink.apache.org 
主题: Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

我写的是upsert呀。。。
insert into 我也测试了,也不行。

是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink未来会弃用TableSourceFactory吗

2020-11-16 文章 Luna Wong
1.11版本 FactoryUtil类中有如下方法。
public static DynamicTableSink createTableSink();

我拿到DynamicTableSink之后调用什么API可以把表注册进Catalog啊。

老版本API有 tEnv.registerTableSink() 注册TableSink。新版我是没办法这么注册DynamicTableSink是吧。

Leonard Xu  于2020年11月16日周一 下午10:59写道:
>
> Hi,
> 据我了解会弃用的,新的connector都会用DynamicTableSourceFactory,一般稳定一两个版本后社区会弃用,
> 另外这个是比较底层的实现,sql用户应该感知不到,如果有自定义connector的开发都建议用DynamicTableSourceFactory。
>
> 祝好
> Leonard Xu
>
> > 在 2020年11月16日,19:54,Luna Wong  写道:
> >
> > FLIP-95都实现后有了DynamicTableSourceFactory那么TableSourceFactory会弃用吗?
>


Re: 回复: flink-1.11.2 的 内存溢出问题

2020-11-16 文章 Xintong Song
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>

不可以的,这个是集群配置。

可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。


Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 9:31 AM Andrew <874269...@qq.com> wrote:

> 应该是不可以这样配置的, 通过配置文件;
> taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
>
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
> 这种属于任务运行时配置!
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> shizhengc...@outlook.com>;
> 发送时间: 2020年11月16日(星期一) 晚上7:14
> 收件人: "user-zh@flink.apache.org"
> 主题: 回复: flink-1.11.2 的 内存溢出问题
>
>
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过
> 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>
> 
> 发件人: Xintong Song  发送时间: 2020年11月16日 10:59
> 收件人: user-zh  主题: Re: flink-1.11.2 的 内存溢出问题
>
> 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 6:38 PM 史 正超 
> > flink-on-yarn . per-job模式,重启是kafka的group.id
> > 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> > 
> > 发件人: Xintong Song  > 发送时间: 2020年11月16日 10:11
> > 收件人: user-zh  > 主题: Re: flink-1.11.2 的 内存溢出问题
> >
> > 是什么部署模式呢?standalone?
> > 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Nov 16, 2020 at 5:53 PM 史 正超  wrote:
> >
> > > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > >
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> > >
> > > 2020-11-16 17:44:52
> > > java.lang.OutOfMemoryError: Direct buffer memory. The direct
> > out-of-memory
> > > error has occurred. This can mean two things: either job(s)
> require(s) a
> > > larger size of JVM direct memory or there is a direct memory
> leak. The
> > > direct memory can be allocated by user code or some of its
> dependencies.
> > In
> > > this case 'taskmanager.memory.task.off-heap.size' configuration
> option
> > > should be increased. Flink framework and its dependencies also
> consume
> > the
> > > direct memory, mostly for network communication. The most of
> network
> > memory
> > > is managed by Flink and should not result in out-of-memory
> error. In
> > > certain special cases, in particular for jobs with high
> parallelism, the
> > > framework may require more direct memory which is not managed by
> Flink.
> > In
> > > this case 'taskmanager.memory.framework.off-heap.size'
> configuration
> > option
> > > should be increased. If the error persists then there is
> probably a
> > direct
> > > memory leak in user code or some of its dependencies which has
> to be
> > > investigated and fixed. The task executor has to be shutdown...
> > > at
> java.nio.Bits.reserveMemory(Bits.java:658)
> > > at
> java.nio.DirectByteBuffer. > > at
> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> > > at
> sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> > > at
> sun.nio.ch.IOUtil.read(IOUtil.java:195)
> > > at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> > > at
> > >
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> > > 

Re:zookeeper更换leader对flink的影响

2020-11-16 文章 RS
哈哈, 我的也是, flink和ZK断开连接的话, 任务会全部重启, 这边测试了各种场景, 比如部署HA方案, 部署多个jobmanager都测试过, 
任务都是会重启的, 同样不知道如何解决.

















在 2020-11-16 18:39:29,"赵一旦"  写道:
>按照我在工作中经验,有过几次需要重启zk集群,我是单个zk节点逐个重启。结论是导致了flink集群中任务的全部自动重启(基于最近一次的ckpt)。这对任务还是有一定影响的,因为ckpt是10分钟一次,会导致瞬间压力变高。
>
>问下这个合理嘛,还是我配置的有问题or操作有问题。


??????????: flink-1.11.2 ?? ????????????

2020-11-16 文章 Andrew
??, ;
taskmanager.memory.task.off-heap.size taskmanager??


streamTableEnv.getConfig().getConfiguration().setString(key, value); 




--  --
??: 
   "user-zh"



Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

2020-11-16 文章 鱼子酱
我写的是upsert呀。。。
insert into 我也测试了,也不行。

是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

2020-11-16 文章 hailongwang
Hi,
这个版本是支持的。
其中插入语句是 "insert into " 而不是 “update into”?

在 2020-11-16 17:04:23,"鱼子酱" <384939...@qq.com> 写道:
>请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多,
>是目前不支持还是我使用的方法不对呢?
>版本:flink 1.11.1 
>
>关键的2个sql如下
>
>create table open_and_close_terminal_minute_1 (
>  request_date varchar
>  ,terminal_no varchar
>  ,logon_time varchar
>  ,logout_time varchar
>  ,insert_time varchar
>  ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED
>) with (
>  'connector' = 'jd……
>  'url' = 'jdbc:mys……se',
>  'table-name' = 'c……,
>  'driver' = 'com.m……
>  'username' = 'ana……
>  'password' = 'ana……
>  'sink.buffer-flus……
>)
>
>upsert into open_and_close_terminal_minute_1 
>select request_date ,terminal_no ,logon_time ,logout_time
>,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)  from
>(   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>MINUTE),'-MM-dd HH:mm:ss.SSS'),0,10) as request_date
>,cast(terminalNo as varchar) as terminal_no
>,DATE_FORMAT(min(times),'-MM-dd HH:mm:ss.SSS') as logon_time
>,DATE_FORMAT(max(times),'-MM-dd HH:mm:ss.SSS') as logout_time
>from caslog INNER join itoa_b_terminal_shop  for system_time as of
>caslog.proc_time
>on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey
>where
>errCode=0 and attr=0
>group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo
>
>)
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink未来会弃用TableSourceFactory吗

2020-11-16 文章 Leonard Xu
Hi, 
据我了解会弃用的,新的connector都会用DynamicTableSourceFactory,一般稳定一两个版本后社区会弃用,
另外这个是比较底层的实现,sql用户应该感知不到,如果有自定义connector的开发都建议用DynamicTableSourceFactory。

祝好
Leonard Xu

> 在 2020年11月16日,19:54,Luna Wong  写道:
> 
> FLIP-95都实现后有了DynamicTableSourceFactory那么TableSourceFactory会弃用吗?



Flink未来会弃用TableSourceFactory吗

2020-11-16 文章 Luna Wong
FLIP-95都实现后有了DynamicTableSourceFactory那么TableSourceFactory会弃用吗?


回复: flink-1.11.2 的 内存溢出问题

2020-11-16 文章 史 正超
好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?

streamTableEnv.getConfig().getConfiguration().setString(key, value);


发件人: Xintong Song 
发送时间: 2020年11月16日 10:59
收件人: user-zh 
主题: Re: flink-1.11.2 的 内存溢出问题

那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 6:38 PM 史 正超  wrote:

> flink-on-yarn . per-job模式,重启是kafka的group.id
> 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> 
> 发件人: Xintong Song 
> 发送时间: 2020年11月16日 10:11
> 收件人: user-zh 
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 是什么部署模式呢?standalone?
> 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 5:53 PM 史 正超  wrote:
>
> > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> >
> > 2020-11-16 17:44:52
> > java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory
> > error has occurred. This can mean two things: either job(s) require(s) a
> > larger size of JVM direct memory or there is a direct memory leak. The
> > direct memory can be allocated by user code or some of its dependencies.
> In
> > this case 'taskmanager.memory.task.off-heap.size' configuration option
> > should be increased. Flink framework and its dependencies also consume
> the
> > direct memory, mostly for network communication. The most of network
> memory
> > is managed by Flink and should not result in out-of-memory error. In
> > certain special cases, in particular for jobs with high parallelism, the
> > framework may require more direct memory which is not managed by Flink.
> In
> > this case 'taskmanager.memory.framework.off-heap.size' configuration
> option
> > should be increased. If the error persists then there is probably a
> direct
> > memory leak in user code or some of its dependencies which has to be
> > investigated and fixed. The task executor has to be shutdown...
> > at java.nio.Bits.reserveMemory(Bits.java:658)
> > at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> > at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> >
> >
> >
>


Re: flink-1.11.2 的 内存溢出问题

2020-11-16 文章 Xintong Song
那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 6:38 PM 史 正超  wrote:

> flink-on-yarn . per-job模式,重启是kafka的group.id
> 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> 
> 发件人: Xintong Song 
> 发送时间: 2020年11月16日 10:11
> 收件人: user-zh 
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 是什么部署模式呢?standalone?
> 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 5:53 PM 史 正超  wrote:
>
> > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> >
> > 2020-11-16 17:44:52
> > java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory
> > error has occurred. This can mean two things: either job(s) require(s) a
> > larger size of JVM direct memory or there is a direct memory leak. The
> > direct memory can be allocated by user code or some of its dependencies.
> In
> > this case 'taskmanager.memory.task.off-heap.size' configuration option
> > should be increased. Flink framework and its dependencies also consume
> the
> > direct memory, mostly for network communication. The most of network
> memory
> > is managed by Flink and should not result in out-of-memory error. In
> > certain special cases, in particular for jobs with high parallelism, the
> > framework may require more direct memory which is not managed by Flink.
> In
> > this case 'taskmanager.memory.framework.off-heap.size' configuration
> option
> > should be increased. If the error persists then there is probably a
> direct
> > memory leak in user code or some of its dependencies which has to be
> > investigated and fixed. The task executor has to be shutdown...
> > at java.nio.Bits.reserveMemory(Bits.java:658)
> > at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> > at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> > at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> >
> >
> >
>


Re: Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题

2020-11-16 文章 jindy_liu
我也遇到这种乱序问题,楼主,你那边在sql层面解决了么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

zookeeper更换leader对flink的影响

2020-11-16 文章 赵一旦
按照我在工作中经验,有过几次需要重启zk集群,我是单个zk节点逐个重启。结论是导致了flink集群中任务的全部自动重启(基于最近一次的ckpt)。这对任务还是有一定影响的,因为ckpt是10分钟一次,会导致瞬间压力变高。

问下这个合理嘛,还是我配置的有问题or操作有问题。


Flink JSON反序列化DECIMAL精度丢失

2020-11-16 文章 Luna Wong
https://issues.apache.org/jira/browse/FLINK-20170

这是我今天提的issue。 
Jackson这样反序列化会把BigDECIMAL转成Double。超过12位的小数精度丢失。这种情况我得怎么办。只能先当做STRING类型处理或修改下JSON这个包的源码重新变一下。
还有其他的最佳实践吗


回复: flink-1.11.2 的 内存溢出问题

2020-11-16 文章 史 正超
flink-on-yarn . 
per-job模式,重启是kafka的group.id没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。

发件人: Xintong Song 
发送时间: 2020年11月16日 10:11
收件人: user-zh 
主题: Re: flink-1.11.2 的 内存溢出问题

是什么部署模式呢?standalone?
之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 5:53 PM 史 正超  wrote:

> 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
>
> 2020-11-16 17:44:52
> java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory
> error has occurred. This can mean two things: either job(s) require(s) a
> larger size of JVM direct memory or there is a direct memory leak. The
> direct memory can be allocated by user code or some of its dependencies. In
> this case 'taskmanager.memory.task.off-heap.size' configuration option
> should be increased. Flink framework and its dependencies also consume the
> direct memory, mostly for network communication. The most of network memory
> is managed by Flink and should not result in out-of-memory error. In
> certain special cases, in particular for jobs with high parallelism, the
> framework may require more direct memory which is not managed by Flink. In
> this case 'taskmanager.memory.framework.off-heap.size' configuration option
> should be increased. If the error persists then there is probably a direct
> memory leak in user code or some of its dependencies which has to be
> investigated and fixed. The task executor has to be shutdown...
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
>
>
>


Re: flink-1.11.2 的 内存溢出问题

2020-11-16 文章 Xintong Song
是什么部署模式呢?standalone?
之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 5:53 PM 史 正超  wrote:

> 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
>
> 2020-11-16 17:44:52
> java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory
> error has occurred. This can mean two things: either job(s) require(s) a
> larger size of JVM direct memory or there is a direct memory leak. The
> direct memory can be allocated by user code or some of its dependencies. In
> this case 'taskmanager.memory.task.off-heap.size' configuration option
> should be increased. Flink framework and its dependencies also consume the
> direct memory, mostly for network communication. The most of network memory
> is managed by Flink and should not result in out-of-memory error. In
> certain special cases, in particular for jobs with high parallelism, the
> framework may require more direct memory which is not managed by Flink. In
> this case 'taskmanager.memory.framework.off-heap.size' configuration option
> should be increased. If the error persists then there is probably a direct
> memory leak in user code or some of its dependencies which has to be
> investigated and fixed. The task executor has to be shutdown...
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
>
>
>


pyflink利用sql ddl连接hbase-1.4.x出错

2020-11-16 文章 ghostviper
*环境配置如下:*
hbase-1.4.13
flink-1.11.1
python-3.6.1
pyflink-1.0

*已做配置如下:*
1.hadoop classpath下已经加入hbase路径 (:/opt/hbase/hbase-1.4.13/lib/*)
2.程序ddl配置如下:

source_ddl = """CREATE TABLE MySourceTable (
hbase_rowkey_name varchar, cf1 ROW) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',  
'connector.table-name' = 'flink-test',
'connector.zookeeper.quorum' =
'es-zk-hadoop1:2181,es-zk-hadoop2:2181,es-zk-hadoop3:2181',
'connector.zookeeper.znode.parent' = '/hbase')
"""

sink_ddl = """CREATE TABLE MySinkTable (
hbase_rowkey_name varchar, cf1 ROW) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',  
'connector.table-name' = 'flink-test-result',
'connector.zookeeper.quorum' =
'es-zk-hadoop1:2181,es-zk-hadoop2:2181,es-zk-hadoop3:2181',
'connector.zookeeper.znode.parent' = '/hbase')
"""
3.zookeeper无鉴权
4.hive能关联访问hbase
5.hbase shell命令能正确执行
6.hbase 集群状态正常
7.hbase lib目录下jar包如下:
./hbase-common-1.4.3.jar
./flink-connector-hbase_2.11-1.11.1.jar
./hbase-client-1.4.3.jar
./hbase-protocol-1.4.3.jar


*出错信息如下:*
Traceback (most recent call last):
  File "read_hbase.py", line 46, in 
st_env.execute("7-read_and_hbase")
  File
"/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/pyflink/table/table_environment.py",
line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
  File
"/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/pyflink/util/exceptions.py",
line 147, in deco
return f(*a, **kw)
  File
"/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/py4j/protocol.py",
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
: org.apache.flink.util.FlinkException: Failed to execute job
'7-read_and_hbase'.
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1823)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit job.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 6 more
Caused by: org.apache.flink.runtime.clie

pyflink利用sql ddl连接hbase-1.4.x出错Configuring the input format (null) failed: Cannot create connection to HBase

2020-11-16 文章 ghostviper
*环境配置如下:*
hbase-1.4.13
flink-1.11.1
python-3.6.1
pyflink-1.0

*已做配置如下:*
1.hadoop classpath下已经加入hbase路径 (:/opt/hbase/hbase-1.4.13/lib/*)
2.程序ddl配置如下:

source_ddl = """CREATE TABLE MySourceTable (
hbase_rowkey_name varchar, cf1 ROW) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',  
'connector.table-name' = 'flink-test',
'connector.zookeeper.quorum' =
'es-zk-hadoop1:2181,es-zk-hadoop2:2181,es-zk-hadoop3:2181',
'connector.zookeeper.znode.parent' = '/hbase')
"""

sink_ddl = """CREATE TABLE MySinkTable (
hbase_rowkey_name varchar, cf1 ROW) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',  
'connector.table-name' = 'flink-test-result',
'connector.zookeeper.quorum' =
'es-zk-hadoop1:2181,es-zk-hadoop2:2181,es-zk-hadoop3:2181',
'connector.zookeeper.znode.parent' = '/hbase')
"""
3.zookeeper无鉴权
4.hive能关联访问hbase
5.hbase shell命令能正确执行
6.hbase 集群状态正常
7.hbase lib目录下jar包如下:
./hbase-common-1.4.3.jar
./flink-connector-hbase_2.11-1.11.1.jar
./hbase-client-1.4.3.jar
./hbase-protocol-1.4.3.jar


*出错信息如下:*
Traceback (most recent call last):
  File "read_hbase.py", line 46, in 
st_env.execute("7-read_and_hbase")
  File
"/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/pyflink/table/table_environment.py",
line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
  File
"/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/pyflink/util/exceptions.py",
line 147, in deco
return f(*a, **kw)
  File
"/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/py4j/protocol.py",
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
: org.apache.flink.util.FlinkException: Failed to execute job
'7-read_and_hbase'.
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1823)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit job.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 6 more
Caused by: org.apache.flink.runtime.cli

flink 1.11.1 使用flinksql,jdbc ,设置主键的情况下,upsert不生效

2020-11-16 文章 鱼子酱
请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多,
是目前不支持还是我使用的方法不对呢?
版本:flink 1.11.1 

关键的2个sql如下

create table open_and_close_terminal_minute_1 (
  request_date varchar
  ,terminal_no varchar
  ,logon_time varchar
  ,logout_time varchar
  ,insert_time varchar
  ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED
) with (
  'connector' = 'jd……
  'url' = 'jdbc:mys……se',
  'table-name' = 'c……,
  'driver' = 'com.m……
  'username' = 'ana……
  'password' = 'ana……
  'sink.buffer-flus……
)

upsert into open_and_close_terminal_minute_1 
select request_date ,terminal_no ,logon_time ,logout_time
,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)  from
(   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'-MM-dd HH:mm:ss.SSS'),0,10) as request_date
,cast(terminalNo as varchar) as terminal_no
,DATE_FORMAT(min(times),'-MM-dd HH:mm:ss.SSS') as logon_time
,DATE_FORMAT(max(times),'-MM-dd HH:mm:ss.SSS') as logout_time
from caslog INNER join itoa_b_terminal_shop  for system_time as of
caslog.proc_time
on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey
where
errCode=0 and attr=0
group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo

)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

2020-11-16 文章 鱼子酱
请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多,
是目前不支持还是我使用的方法不对呢?
版本:flink 1.11.1 

关键的2个sql如下

create table open_and_close_terminal_minute_1 (
  request_date varchar
  ,terminal_no varchar
  ,logon_time varchar
  ,logout_time varchar
  ,insert_time varchar
  ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED
) with (
  'connector' = 'jd……
  'url' = 'jdbc:mys……se',
  'table-name' = 'c……,
  'driver' = 'com.m……
  'username' = 'ana……
  'password' = 'ana……
  'sink.buffer-flus……
)

upsert into open_and_close_terminal_minute_1 
select request_date ,terminal_no ,logon_time ,logout_time
,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)  from
(   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'-MM-dd HH:mm:ss.SSS'),0,10) as request_date
,cast(terminalNo as varchar) as terminal_no
,DATE_FORMAT(min(times),'-MM-dd HH:mm:ss.SSS') as logon_time
,DATE_FORMAT(max(times),'-MM-dd HH:mm:ss.SSS') as logout_time
from caslog INNER join itoa_b_terminal_shop  for system_time as of
caslog.proc_time
on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey
where
errCode=0 and attr=0
group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo

)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink-1.11.2 的 内存溢出问题

2020-11-16 文章 史 正超
使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给 
10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。

2020-11-16 17:44:52
java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory 
error has occurred. This can mean two things: either job(s) require(s) a larger 
size of JVM direct memory or there is a direct memory leak. The direct memory 
can be allocated by user code or some of its dependencies. In this case 
'taskmanager.memory.task.off-heap.size' configuration option should be 
increased. Flink framework and its dependencies also consume the direct memory, 
mostly for network communication. The most of network memory is managed by 
Flink and should not result in out-of-memory error. In certain special cases, 
in particular for jobs with high parallelism, the framework may require more 
direct memory which is not managed by Flink. In this case 
'taskmanager.memory.framework.off-heap.size' configuration option should be 
increased. If the error persists then there is probably a direct memory leak in 
user code or some of its dependencies which has to be investigated and fixed. 
The task executor has to be shutdown...
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)




Re: native K8s模式下,pod系统占用内存

2020-11-16 文章 Yang Wang
不太理解你说的Pod系统占用多少内存是什么意思,Pod并不是虚拟机,而是docker container来进行的轻量虚拟化
和宿主机是共用内核的,本身不会带来额外的内存开销

至于Pod的内存设置,你说的是对的。Pod的limit并不是和JVM的heap内存相等的,因为还有offheap的内存以及JVM的overhead
所以你会看到JVM的参数并不是和Pod的limit完全相等。Pod的limit是根据TaskManager能够使用的最大内存来设置的,具体
每部分的内存配置和你可以参考社区文档[1].


[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup_tm.html


Best,
Yang

caozhen  于2020年11月9日周一 下午4:56写道:

> 在native K8s模式下,创建的JM Pod、TM Pod,看代码中将flink process memor分配给了Pod可使用的资源。
>
> 我理解,pod中不止JVM进程,还可能有其他内存占用,例如Linux系统使用内存。
>
> 所以我有个疑问是pod系统占用多少内存?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql-client/sql-gateway 支持multiple insert

2020-11-16 文章 朱广彬
Hi,Community:

目前sql-client和sql-gateway只能支持单条SQL statement,这样就没法利用multiple insert的优化。如下:

INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE 
'%Rubber%'
INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE 
'%Glass%'

两条语句会分别生成两个Job,对Orders表读了两次。如果Orders表是Kafka,消费两次的成本是很大的浪费,是否可以优化下?

一种方案是,在SQL 语法层面上支持,如Hive的multiple insert:

FROM Orders INSERT INTO RubberOrders SELECT product, amount WHERE product LIKE 
'%Rubber%' INSERT INTO GlassOrders SELECT product, amount WHERE product LIKE 
'%Glass%'

另一种方案是,让sql-client/sql-gateway支持multiple SQL 
statement,这样即使语法不支持,两条INSERT语句也可以被优化成一个Job,只scan一次Orders表。


不知道社区后续是否有计划针对这个feature优化?



附:目前Table API 支持multiple insert的优化

// run multiple INSERT queries on the registered source table and emit the 
result to registered sink tables
StatementSet stmtSet = tEnv.createStatementSet();
// only single INSERT query can be accepted by `addInsertSql` method
stmtSet.addInsertSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product 
LIKE '%Rubber%'");
stmtSet.addInsertSql(
  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product 
LIKE '%Glass%'");
// execute all statements together
TableResult tableResult2 = stmtSet.execute();




Re:Re:Re:Blink 1.11 create view是不是没有办法把rowtime带下去?

2020-11-16 文章 hailongwang


select 时候带上这个字段? 只要这个字段没有参与计算,就不会被物化。
如果是 window 处理后,还需要具有时间属性的字段,可以参考[1]
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#group-windows


在 2020-11-16 14:57:39,"周虓岗"  写道:
>
>
>
>
>
>
>
>
>
>不是,我值得是table api可以带event time。 如果整个使用sql表达,怎么把time attribute待下去
>
>
>
>
>
>
>
>
>
>
>在 2020-11-16 15:53:44,"hailongwang" <18868816...@163.com> 写道:
>>Hi zhou,
>>   你是指的 createTemporaryView 这个方法吗,这个方法上也可以指定字段,例子可以查看[1]。
>>其中 createTemporaryView 的实现也是间接调用了 fromDataStream 方法[2]。
>>
>>
>>[1] 
>>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-view-from-a-datastream-or-dataset
>>[2] 
>>https://github.com/apache/flink/blob/c24185d1c2853d5c56eed6c40e5960d2398474ca/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java#L253
>>
>>
>>Best,
>>Hailong
>>
>>
>>
>>
>>在 2020-11-16 13:48:35,"周虓岗"  写道:
>>
>>通过table api的// declare an additional logical field as an event time attribute
>>
>>Tabletable=tEnv.fromDataStream(stream,$("user_name"),$("data"),$("user_action_time").rowtime()");
>>
>>
>>可以把eventtime往后传,
>>如果使用createview的话怎么把这个time attribute往后带吗?
>>
>>
>>不往后传的话可能会
>>
>>
>>这个有什么方法吗?
>>
>>
>>
>>
>>
>>
>> 


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-16 文章 jindy_liu
1、试了下

在test表中增加一个proctime 

CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`proctime` AS PROCTIME(),
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'no_lock',
  'password' = 'no_lock',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test',
  'debezium.snapshot.locking.mode' = 'none'
);

写去重语句,

INSERT into test_status_print 
SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as 
rowNum
FROM (
SELECT t.* , s.name as status_name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id
)
)r WHERE rowNum = 1;

但提示报错,不支持:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Deduplicate doesn't support
consuming update and delete changes which is produced by node
Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name,
time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])




--
Sent from: http://apache-flink.147419.n8.nabble.com/