直接内存溢出
报错信息如下 Caused by: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 版本 Flink:1.9.1 kafka-client:0.10.0.1 环境 on yarn JVM参数 -Xms14336m -Xmx14336m -XX:MaxDirectMemorySize=6144m flink-conf.yml 使用的是默认的参数 Stream任务,并且没有使用RocksDB 目前初步怀疑是Flink 的堆外内存占用过大导致kafka consumer 无法申请堆外内存导致OOM。但根据官方文档的配置 taskmanager.memory.fraction=0.7 这个应该在我的程序中不生效 taskmanager.network.memory.fraction=0.1 这样的配置下来应该用户代码可使用的堆外内存为6144m*0.9=5529m 我的问题是 在我当前的环境下是否还有我没注意到的Flink堆外内存配置,或者Flink需要占用的堆外内存是我所不了解的。 除了控制kafka comsumer 的流量以外有没有什么其他的调整方式? Best Aven
Re:Flink on YARN 使用Kerboros认证失败
之前在使用hadoop client时设置了一个系统变量, 当这个变量没设置的时候就会报之前的错误 System.setProperty("java.security.krb5.conf", "C:\\Users\\86177\\Desktop\\tmp\\5\\krb5.conf" ); 但flink on yarn 没有提供这个参数的设置。 在 2020-03-24 20:52:44,"aven.wu" 写道: Flink 提交作业到有kerboros认证的集群报以下异常 java.lang.Exception: unable to establish the security context at org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1124) Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:276) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:312) at org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70) at org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67) ... 1 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(KerberosUtil.java:84) at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:63) ... 5 more Caused by: KrbException: Cannot locate default realm at sun.security.krb5.Config.getDefaultRealm(Config.java:1029) ... 11 more 使用了官网提供的四个参数,配置在了flink-conf.yaml里 security.kerberos.login.use-ticket-cache: false security.kerberos.login.keytab: /home/flink-1.8.0/conf/flink.keytab security.kerberos.login.principal: flink/hado...@example.com security.kerberos.login.realm: EXAMPLE.COM security.kerberos.login.contexts: KafkaClient /home/flink-1.8.0/conf/flink.keytab 文件已放好, Best Aven
Re:回复:Kafka库和Flink的反向类加载方法不兼容
感谢回答 出现这个问题也是因为我把flink connect 的jar包打成lib 放在flink 和 hadoop 的classpath下面(缩小打出来的应用程序包),出现这个报错。我大概理解这个class加载的意思了。 在 2019-11-23 17:16:51,"1193216154" <1193216...@qq.com> 写道: >parent first就是先加载环境的jar包,再加载用户的jar包(就是自己写的flink程序),children >first就是反过来。flink默认配置是chikdren >first,建议不要动这个配置。而是检查一下自己的flink程序的pom依赖和flink lib下面的jar包有没有冲突 > > > > > >---原始邮件--- >发件人: "aven.wu"发送时间: 2019年11月23日(星期六) 下午4:57 >收件人: "user-zh@flink.apache.org"主题: Kafka库和Flink的反向类加载方法不兼容 > > >报错如下 >cannot assign instance of org.apache.commons.collections.map.LinkedMap to >field >org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit > of type org.apache.commons.collections.map.LinkedMap in instance of >org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 > >修改flink-conf.yaml >classloader.resolve-order: parent-first > >哪位大佬能解释一下这个反向类加载是什么意思? > >发送自Aven.wu
Re:Re: 关于elasticSearch table sink 构造过于复杂
感谢解答, 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入 在 2019-08-26 16:39:49,"Jark Wu" 写道: >Hi , > > >Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。 >如果要注册一个 ES sink,可以使用 descriptor API,也就是 >org.apache.flink.table.descriptors.Elasticsearch。 >或者使用 DDL 方式注册。 > > >Best, >Jark > >> 在 2019年8月26日,16:33,aven.wu 写道: >> >> Elasticsearch6UpsertTableSink >> 的构造方法过于复杂参数非常多 >> >> public Elasticsearch6UpsertTableSink( >> boolean isAppendOnly, >> TableSchema schema, >> List hosts, >> String index, >> String docType, >> String keyDelimiter, >> String keyNullLiteral, >> SerializationSchema serializationSchema, >> XContentType contentType, >> ActionRequestFailureHandler failureHandler, >> Map sinkOptions) { >> >> super( >> isAppendOnly, >> schema, >> hosts, >> index, >> docType, >> keyDelimiter, >> keyNullLiteral, >> serializationSchema, >> contentType, >> failureHandler, >> sinkOptions, >> UPDATE_REQUEST_FACTORY); >> } >> >> >> 请问,是不是我的用法不对? >> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。 >>
Elasticsearch6UpsertTableSink 的构造方法过于复杂
public Elasticsearch6UpsertTableSink( boolean isAppendOnly, TableSchema schema, List hosts, String index, String docType, String keyDelimiter, String keyNullLiteral, SerializationSchema serializationSchema, XContentType contentType, ActionRequestFailureHandler failureHandler, Map sinkOptions) { super( isAppendOnly, schema, hosts, index, docType, keyDelimiter, keyNullLiteral, serializationSchema, contentType, failureHandler, sinkOptions, UPDATE_REQUEST_FACTORY); } 请问,是不是我的用法不对? 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
flink filesystem 1.7.2 on Hadoop 2.7 BucketingSink.reflectTruncat() 有写入很多小文件到hdfs的风险
源码在 BucketingSink 615行 Path testPath = new Path(basePath, UUID.randomUUID().toString()); try (FSDataOutputStream outputStream = fs.create(testPath)) { outputStream.writeUTF("hello"); } catch (IOException e) { LOG.error("Could not create file for checking if truncate works.", e); throw new RuntimeException("Could not create file for checking if truncate works. " + "You can disable support for truncate() completely via " + "BucketingSink.setUseTruncate(false).", e); } try { m.invoke(fs, testPath, 2); } catch (IllegalAccessException | InvocationTargetException e) { LOG.debug("Truncate is not supported.", e); m = null; } try { fs.delete(testPath, false); } catch (IOException e) { LOG.error("Could not delete truncate test file.", e); throw new RuntimeException("Could not delete truncate test file. " + "You can disable support for truncate() completely via " + "BucketingSink.setUseTruncate(false).", e); } line 635 开始创建一个测试文件 “FSDataOutputStream outputStream = fs.create(testPath)” line 636 尝试写入 一段 测试文字"hello" "outputStream.writeUTF("hello")" line 645 调用 truncate 方法“m.invoke(fs, testPath, 2);” line 652 删除测试文件 “fs.delete(testPath, false);“ 上述逻辑有一些瑕疵 : 1 在635行创建一个测试文件后,636行写入hello 失败,抛出异常(导致程序重启或退出) 2 在645行调用m.invocate 失败 抛出异常(导致程序重启或退出) 两行操作都抛出异常终止程序或重启程序,导致创建的测试文件无法被删除,极端情况下。程序一直在抛出异常然后重启,根据我阅读的代码 reflectTruncat(Filesystem fs)是程序初始化 state的时候会执行。 望大佬能指点一下,是我的姿势不对还是这块的设计有瑕疵。