直接内存溢出

2020-12-16 文章
报错信息如下
Caused by: java.lang.OutOfMemoryError: Direct buffer memory at 
java.nio.Bits.reserveMemory(Bits.java:693) 

 at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at 
java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) 

 at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) 

 at sun.nio.ch.IOUtil.read(IOUtil.java:195) 

 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

 at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)

 at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)

 at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)

 at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) 

 at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) 

 at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) 

 at org.apache.kafka.common.network.Selector.poll(Selector.java:303) 

 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) 

 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
 

 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
 

 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)



版本 
  Flink:1.9.1 
  kafka-client:0.10.0.1
环境 
  on yarn
JVM参数
  -Xms14336m
  -Xmx14336m
  -XX:MaxDirectMemorySize=6144m
flink-conf.yml
 使用的是默认的参数
 Stream任务,并且没有使用RocksDB
 
目前初步怀疑是Flink 的堆外内存占用过大导致kafka consumer 无法申请堆外内存导致OOM。但根据官方文档的配置 
taskmanager.memory.fraction=0.7 这个应该在我的程序中不生效
taskmanager.network.memory.fraction=0.1


这样的配置下来应该用户代码可使用的堆外内存为6144m*0.9=5529m
我的问题是
在我当前的环境下是否还有我没注意到的Flink堆外内存配置,或者Flink需要占用的堆外内存是我所不了解的。 
除了控制kafka comsumer 的流量以外有没有什么其他的调整方式?


Best
Aven



Re:Flink on YARN 使用Kerboros认证失败

2020-03-24 文章
之前在使用hadoop client时设置了一个系统变量, 当这个变量没设置的时候就会报之前的错误
System.setProperty("java.security.krb5.conf", 
"C:\\Users\\86177\\Desktop\\tmp\\5\\krb5.conf" );
但flink on yarn 没有提供这个参数的设置。







在 2020-03-24 20:52:44,"aven.wu"  写道:

Flink 提交作业到有kerboros认证的集群报以下异常

 

java.lang.Exception: unable to establish the security context
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124)
Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:276)
at 
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:312)
at 
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
... 1 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:84)
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63)
... 5 more
Caused by: KrbException: Cannot locate default realm
at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
... 11 more

 

使用了官网提供的四个参数,配置在了flink-conf.yaml里

 

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/flink-1.8.0/conf/flink.keytab
security.kerberos.login.principal: flink/hado...@example.com
security.kerberos.login.realm: EXAMPLE.COM
security.kerberos.login.contexts: KafkaClient

 

/home/flink-1.8.0/conf/flink.keytab 文件已放好,

 

 

Best

Aven

 

Re:回复:Kafka库和Flink的反向类加载方法不兼容

2019-11-23 文章
感谢回答
 出现这个问题也是因为我把flink connect 的jar包打成lib 放在flink 和 hadoop 
的classpath下面(缩小打出来的应用程序包),出现这个报错。我大概理解这个class加载的意思了。
在 2019-11-23 17:16:51,"1193216154" <1193216...@qq.com> 写道:
>parent first就是先加载环境的jar包,再加载用户的jar包(就是自己写的flink程序),children 
>first就是反过来。flink默认配置是chikdren 
>first,建议不要动这个配置。而是检查一下自己的flink程序的pom依赖和flink lib下面的jar包有没有冲突
>
>
>
>
>
>---原始邮件---
>发件人: "aven.wu"发送时间: 2019年11月23日(星期六) 下午4:57
>收件人: "user-zh@flink.apache.org"主题: Kafka库和Flink的反向类加载方法不兼容
>
>
>报错如下
>cannot assign instance of org.apache.commons.collections.map.LinkedMap to 
>field 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
> of type org.apache.commons.collections.map.LinkedMap in instance of 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
>
>修改flink-conf.yaml
>classloader.resolve-order: parent-first
>
>哪位大佬能解释一下这个反向类加载是什么意思?
>
>发送自Aven.wu


Re:Re: 关于elasticSearch table sink 构造过于复杂

2019-08-26 文章
感谢解答,
我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入






在 2019-08-26 16:39:49,"Jark Wu"  写道:
>Hi ,
>
>
>Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。
>如果要注册一个 ES sink,可以使用 descriptor API,也就是 
>org.apache.flink.table.descriptors.Elasticsearch。
>或者使用 DDL 方式注册。
>
>
>Best,
>Jark
>
>> 在 2019年8月26日,16:33,aven.wu  写道:
>> 
>> Elasticsearch6UpsertTableSink
>> 的构造方法过于复杂参数非常多
>> 
>> public Elasticsearch6UpsertTableSink(
>>  boolean isAppendOnly,
>>  TableSchema schema,
>>  List hosts,
>>  String index,
>>  String docType,
>>  String keyDelimiter,
>>  String keyNullLiteral,
>>  SerializationSchema serializationSchema,
>>  XContentType contentType,
>>  ActionRequestFailureHandler failureHandler,
>>  Map sinkOptions) {
>> 
>>   super(
>>  isAppendOnly,
>>  schema,
>>  hosts,
>>  index,
>>  docType,
>>  keyDelimiter,
>>  keyNullLiteral,
>>  serializationSchema,
>>  contentType,
>>  failureHandler,
>>  sinkOptions,
>>  UPDATE_REQUEST_FACTORY);
>> }
>> 
>> 
>> 请问,是不是我的用法不对?
>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
>> 


Elasticsearch6UpsertTableSink 的构造方法过于复杂

2019-08-26 文章
public Elasticsearch6UpsertTableSink(

  boolean isAppendOnly,

  TableSchema schema,

  List hosts,

  String index,

  String docType,

  String keyDelimiter,

  String keyNullLiteral,

  SerializationSchema serializationSchema,

  XContentType contentType,

  ActionRequestFailureHandler failureHandler,

  Map sinkOptions) {

 

   super(

  isAppendOnly,

  schema,

  hosts,

  index,

  docType,

  keyDelimiter,

  keyNullLiteral,

  serializationSchema,

  contentType,

  failureHandler,

  sinkOptions,

  UPDATE_REQUEST_FACTORY);

}

 

请问,是不是我的用法不对?

有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。

flink filesystem 1.7.2 on Hadoop 2.7 BucketingSink.reflectTruncat() 有写入很多小文件到hdfs的风险

2019-06-24 文章
源码在 BucketingSink 615行
Path testPath = new Path(basePath, UUID.randomUUID().toString());
try (FSDataOutputStream outputStream = fs.create(testPath)) {
   outputStream.writeUTF("hello");
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
   throw new RuntimeException("Could not create file for checking if truncate 
works. " +
"You can disable support for truncate() completely via " +
"BucketingSink.setUseTruncate(false).", e);
}

try {
   m.invoke(fs, testPath, 2);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.debug("Truncate is not supported.", e);
m = null;
}

try {
   fs.delete(testPath, false);
} catch (IOException e) {
LOG.error("Could not delete truncate test file.", e);
   throw new RuntimeException("Could not delete truncate test file. " +
"You can disable support for truncate() completely via " +
"BucketingSink.setUseTruncate(false).", e);
}
line 635 开始创建一个测试文件 “FSDataOutputStream outputStream = fs.create(testPath)”
line 636 尝试写入 一段 测试文字"hello" "outputStream.writeUTF("hello")"
line 645 调用 truncate 方法“m.invoke(fs, testPath, 2);”
line 652 删除测试文件 “fs.delete(testPath, false);“
上述逻辑有一些瑕疵 :
 1 在635行创建一个测试文件后,636行写入hello 失败,抛出异常(导致程序重启或退出)
 2 在645行调用m.invocate 失败 抛出异常(导致程序重启或退出)
 两行操作都抛出异常终止程序或重启程序,导致创建的测试文件无法被删除,极端情况下。程序一直在抛出异常然后重启,根据我阅读的代码 
reflectTruncat(Filesystem fs)是程序初始化 state的时候会执行。


望大佬能指点一下,是我的姿势不对还是这块的设计有瑕疵。