Re:hive 进行 overwrite 合并数据后文件变大?
是不是数据重复了,如果是ORC格式可以尝试执行alter table table_name partition (pt_dt='2021-02-20') concatenate 语句进行小文件的合并。 --Original-- From: "RS"; Date: 2022年2月22日(星期二) 上午9:36 To: "user-zh"; Subject: hive 进行 overwrite 合并数据后文件变大? Hi, flink写hive任务,checkpoint周期配置的比较短,生成了很多小文件,一天一个目录, 然后我调用flink sql合并之前的数据,跑完之后,发现存储变大了,请教下这个是什么原因导致的? 合并之前是很多小part文件,overwrite之后文件减少了,但是存储变大了,从274MB变大成2.9GB了? hive表table1的分区字段是`date` insert overwrite aw_topic_compact select * from `table1` where `date`='2022-02-21'; 合并前: 514.0 M 1.5 G /user/hive/warehouse/ods.db/table1/date=2022-02-20 274.0 M 822.1 M /user/hive/warehouse/ods.db/table1/date=2022-02-21 48.1 M 144.2 M /user/hive/warehouse/ods.db/table1/date=2022-02-22 合并后: 514.0 M 1.5 G /user/hive/warehouse/ods.db/table1/date=2022-02-20 2.9 G 8.7 G /user/hive/warehouse/ods.db/table1/date=2022-02-21 47.6 M 142.9 M /user/hive/warehouse/ods.db/table1/date=2022-02-22
Re: Flink On Yarn HA 部署模式下Flink程序无法启动
您好,我的版本是1.13.1 --Original-- From: "Yang Wang"https://issues.apache.org/jira/browse/FLINK-19212 Best, Yang 周瑞
Flink On Yarn HA 部署模式下Flink程序无法启动
您好:Flink程序部署在Yran上以Appliation Mode 模式启动的,在没有采用HA 模式的时候可以正常启动,配置了HA之后,启动异常,麻烦帮忙看下是什么原因导致的. HA 配置如下: high-availability: zookeeper high-availability.storageDir: hdfs://mycluster/flink/ha high-availability.zookeeper.quorum: zk-1:2181,zk-2:2181,zk-3:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /flink_cluster 异常如下: 2021-08-17 10:24:18,938 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}. 2021-08-17 10:25:09,706 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] - Unhandled exception. org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to serialize the result for RPC call : requestTaskManagerDetailsInfo. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_292] at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848) ~[?:1.8.0_292] at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168) ~[?:1.8.0_292] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.1.jar:1.13.1] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.1.jar:1.13.1] Caused by: java.io.NotSerializableException: org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) ~[?:1.8.0_292] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_292] at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:387) ~[flink-dist_2.12-1.13.1.jar:1.13.1] ... 29 more
Flink HIve 文件,上游Kafak数据很大的时候,无法完成checkpoint
您好,Flink Hive 当上游的Kafka数据量特别大的时候,发现checkpoint一直无法完成,5分钟后报错了。请问这个问题要怎么解决
Re: Flink HIve 文件压缩报错
您好: 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑 --Original-- From: "Rui Li"
Flink HIve 数据写入后查询无数据
您好:Flink 写入Hive的时候数据已经写进去了,但是中间发生了异常导致这里的文件没有compact,Hive数据表查不出数据,后续的Flink程序启动后由于数据不再写入这个分区。导致该分区的数据一直无法compact.请问这种异常需要怎么解决,有手动修复的方法么?
Flink HIve 文件压缩报错
您好:Flink 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 2021-08-10 19:34:19 java.io.UncheckedIOException: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) at org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) at java.util.HashMap.forEach(HashMap.java:1288) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) ... 19 more
Application Mode 模式部署失败
您好:Flink Appliaction mode 模式启动失败,启动命令和错误日志如下./flink run-application -t yarn-application \ -yD yarn.application.name="MyFlinkApp" \ -yD yarn.provided.lib.dirs="hdfs://10.10.98.226:8020/user/myflink/flink-common-deps/libs/yarn-flink-1.13.0/lib/;hdfs://10.10.98.226:8020/user/myflink/flink-common-deps/libs/yarn-flink-1.13.0/plugins/" \ /app/qmatrix/yarn-flink-1.13.0/qmatrix_jars/Sink/Hive/0.0.2/flink-hive-sink-3.0.0-jar-with-dependencies.jar \ --config '{"centerName":"rui","checkPointInterval":1000,"checkPointPath":"hdfs://10.10.98.226:8020/tmp/checkpoint66/Sink_rui_ruihiveha_HaHive","configProperties":{"tableKeyName":"{\"rui.fruitest1\":[\"show_integer\"]}"},"jobParallelism":3,"kafkaBootstrapServers":"qmatrix-1:9092,qmatrix-2:9092,qmatrix-3:9092","kafkaConsumerProperties":{"enable.auto.commit":"false","partition.assignment.strategy":"org.apache.kafka.clients.consumer.RoundRobinAssignor","max.poll.records":"500","group.id":"Sink_rui_ruihiveha_HaHive_g","auto.offset.reset":"earliest","session.timeout.ms":"15000","bootstrap.servers":"qmatrix-1:9092,qmatrix-2:9092,qmatrix-3:9092","max.partition.fetch.bytes":"1048576","max.poll.interval.ms":"180","heartbeat.interval.ms":"3000","isolation.level":"read_committed","auto.commit.interval.ms":"1000"},"managerJdbcProperties":{"url":"jdbc:mysql://qmatrix-mysql:3306/qmatrix","phyTimeoutMillis":"3","maxActive":"1","driverClassName":"com.mysql.jdbc.Driver","removeAbandoned":"true","minEvictableIdleTimeMillis":"3","username":"root","minIdle":"0","removeAbandonedTimeout":"3","timeBetweenEvictionRunsMillis":"3","password":"Cljslrl0620$","keepAlive":"false","initialSize":"0"},"passwordKey":"GOODLUCKEVERYONE","pipeName":"ruihiveha","targetConfig":{"rui.ruihiveha.rac80_forever.rac80_ruitest1.1627609625759.router.qmatrix":{"migratePartitionTime":1,"partitionField":"pt_dt","partitionInterval":1,"partitionTime":0,"partitionTimeField":"SHOW_TIMESTAMP","periodMigratePartitionTime":0,"targetSchema":"rui","targetTable":"fruitest1"}},"targetNodeName":"HaHive","targetNodeProperties":{"defaultSchema":"default","hiveConfDir":"/app/qmatrix/kerberos/HaHive","hiveUser":"root","password":"123456","url":"jdbc:hive2://10.10.98.42:1","username":"root"},"targetNodeType":"Hive","topologyId":1137,"topologyName":"Sink_rui_ruihiveha_HaHive","yarnJobManagerMemory":"1024MB","yarnTaskManagerMemory":"2048MB","zookeeperUrl":"qmatrix-1:2181,qmatrix-2:2181,qmatrix-3:2181"}' \ --nodeType Hive \ --jobType Sink \ 2021-07-30 11:20:01,402 INFO flink-akka.actor.default-dispatcher-3 org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled external resources: [] 2021-07-30 11:20:01,416 INFO flink-akka.actor.default-dispatcher-3 org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper bound of the thread pool size is 500 2021-07-30 11:20:01,418 INFO flink-akka.actor.default-dispatcher-3 org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy [] - yarn.client.max-cached-nodemanagers-proxies : 0 2021-07-30 11:20:01,421 INFO flink-akka.actor.default-dispatcher-3 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - ResourceManager akka.tcp://flink@qmatrix-web:37982/user/rpc/resourcemanager_0 was granted leadership with fencing token 2021-07-30 11:20:01,818 INFO flink-akka.actor.default-dispatcher-4 com.alibaba.druid.pool.DruidDataSource [] - {dataSource-1} inited 2021-07-30 11:20:02,648 WARN flink-akka.actor.default-dispatcher-4 org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_161]at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_161] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) ~[?:1.8.0_161] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_161] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_161] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_161] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257) ~[flink-dist_2.11-1.13.0.jar:1.13.0] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.11-1.13.0.jar:1.13.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_161]at
Application Mode 模式部署失败
您好: 我的
savepopint 调试
您好: 我的savepoint数据有些问题,想在本地调试,请问在IDEA本地启动Flink程序的时候如何设置从指定savepoint的地址恢复启动
Re:Re:flink1.12版本,yarn-application模式Flink web ui看不到日志
我这也是,只有这些日志 prelaunch.out0.07 prelaunch.err0 taskmanager.out0 taskmanager.err --Original-- From: "smq"<374060...@qq.com; Date: Mon, Jun 7, 2021 03:49 PM To: "周瑞"http://apache-flink.147419.n8.nabble.com/ amp;gt;amp;gt; amp;gt; amp;gt;amp;gt; amp;gt; amp;gt;amp;gt; amp;gt; amp;gt;amp;gt; amp;gt; -- amp;gt;amp;gt; amp;gt; Best, amp;gt;amp;gt; amp;gt; amp;amp;nbsp; pp amp;gt;amp;gt; amp;gt; amp;gt; amp;gt; -- amp;gt; Best, amp;gt;amp;nbsp;amp;nbsp; pp amp;gt; -- Best, amp;nbsp; pp
Re:flink1.12版本,yarn-application模式Flink web ui看不到日志
您好请问这个问题解决了么,我也遇到了同样的问题,在Standalone模式下日志是可以正常输出的,部署到yarn之后只有error日志了 --Original-- From: "smq"<374060...@qq.com; Date: Fri, Jun 4, 2021 07:06 PM To: "r pp"http://apache-flink.147419.n8.nabble.com/ gt;gt; gt; gt;gt; gt; gt;gt; gt; gt;gt; gt; -- gt;gt; gt; Best, gt;gt; gt; amp;nbsp; pp gt;gt; gt; gt; gt; -- gt; Best, gt;nbsp;nbsp; pp gt; -- Best, nbsp; pp
kafka exactly-once语义下,从svaepoint恢复报错
您好:kafka在exactly-once语义下,从svaepoint恢复报错。初步排查认为是kafka事务使用了旧的epoch。请问这个问题怎么处理? //todo 通过配置传进来 env.setParallelism(1); env.enableCheckpointing(60L, CheckpointingMode.EXACTLY_ONCE); // checkpoint的清除策略(即使任务被显示地取消也会保留checkpoint) env.getCheckpointConfig() .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointTimeout(6); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10); //TODO 生产中必须使用 HDFS env.setStateBackend(new FsStateBackend("hdfs://10.10.98.226:8020/tmp/checkpoint66")); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); public static final String TABLE_NAME = "KafkaTable"; public static final String COLUMN_NAME = "source_value"; public static final String KAFKA_TABLE_FORMAT = "CREATE TABLE "+TABLE_NAME+" (\n" + " "+COLUMN_NAME+" STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'sink.semantic' = 'exactly-once',\n" + " 'properties.transaction.timeout.ms' = '3',\n" + " 'format' = 'dbz-json'\n" + ")\n"; org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. while recovering transaction KafkaTransactionState [transactionalId=Source: TableSourceScan(table=[[default_catalog, default_database, debezium_source]], fields=[data]) - Sink: Sink(table=[default_catalog.default_database.KafkaTable], fields=[data])-7df19f87deec5680128845fd9a6ca18d-0, producerId=239009, epoch=216]. Presumably this transaction has been already committed before