Re:hive 进行 overwrite 合并数据后文件变大?

2022-02-21 文章
是不是数据重复了,如果是ORC格式可以尝试执行alter table table_name partition (pt_dt='2021-02-20') 
concatenate 语句进行小文件的合并。


--Original--
From: "RS"; 
Date: 2022年2月22日(星期二) 上午9:36
To: "user-zh"; 
Subject: hive 进行 overwrite 合并数据后文件变大?


Hi,
flink写hive任务,checkpoint周期配置的比较短,生成了很多小文件,一天一个目录,
然后我调用flink sql合并之前的数据,跑完之后,发现存储变大了,请教下这个是什么原因导致的?
合并之前是很多小part文件,overwrite之后文件减少了,但是存储变大了,从274MB变大成2.9GB了?


hive表table1的分区字段是`date`
insert overwrite aw_topic_compact select * from `table1` where 
`date`='2022-02-21';


合并前:
514.0 M 1.5 G 
/user/hive/warehouse/ods.db/table1/date=2022-02-20
274.0 M 822.1 M /user/hive/warehouse/ods.db/table1/date=2022-02-21
48.1 M 144.2 M 
/user/hive/warehouse/ods.db/table1/date=2022-02-22



合并后:
514.0 M 1.5 G 
/user/hive/warehouse/ods.db/table1/date=2022-02-20
2.9 G 8.7 G 
/user/hive/warehouse/ods.db/table1/date=2022-02-21
47.6 M 142.9 M 
/user/hive/warehouse/ods.db/table1/date=2022-02-22

Re: Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-17 文章
您好,我的版本是1.13.1


--Original--
From: "Yang Wang"https://issues.apache.org/jira/browse/FLINK-19212

Best,
Yang

周瑞 

Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-16 文章
您好:Flink程序部署在Yran上以Appliation Mode 模式启动的,在没有采用HA 
模式的时候可以正常启动,配置了HA之后,启动异常,麻烦帮忙看下是什么原因导致的.


HA 配置如下:
high-availability: zookeeper high-availability.storageDir: 
hdfs://mycluster/flink/ha high-availability.zookeeper.quorum: 
zk-1:2181,zk-2:2181,zk-3:2181 high-availability.zookeeper.path.root: /flink 
high-availability.cluster-id: /flink_cluster


异常如下:
2021-08-17 10:24:18,938 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
2021-08-17 10:25:09,706 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] 
- Unhandled exception.
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to 
serialize the result for RPC call : requestTaskManagerDetailsInfo.
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
~[?:1.8.0_292]
at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
 ~[?:1.8.0_292]
at 
java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168) 
~[?:1.8.0_292]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
Caused by: java.io.NotSerializableException: 
org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
~[?:1.8.0_292]
at 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
~[?:1.8.0_292]
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:387)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 29 more

Flink HIve 文件,上游Kafak数据很大的时候,无法完成checkpoint

2021-08-11 文章
您好,Flink Hive 当上游的Kafka数据量特别大的时候,发现checkpoint一直无法完成,5分钟后报错了。请问这个问题要怎么解决

Re: Flink HIve 文件压缩报错

2021-08-11 文章
您好:
 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑


--Original--
From: "Rui Li"

Flink HIve 数据写入后查询无数据

2021-08-11 文章
您好:Flink 
写入Hive的时候数据已经写进去了,但是中间发生了异常导致这里的文件没有compact,Hive数据表查不出数据,后续的Flink程序启动后由于数据不再写入这个分区。导致该分区的数据一直无法compact.请问这种异常需要怎么解决,有手动修复的方法么?

Flink HIve 文件压缩报错

2021-08-10 文章
您好:Flink 
写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
2021-08-10 19:34:19 java.io.UncheckedIOException: 
java.io.FileNotFoundException: File does not exist: 
hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
   at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
 at 
org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)  at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
 at java.util.HashMap.forEach(HashMap.java:1288) at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
  at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
  at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
  at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)   at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at 
java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: 
File does not exist: 
hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
  at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
  at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
 ... 19 more

Application Mode 模式部署失败

2021-07-29 文章
您好:Flink Appliaction mode 模式启动失败,启动命令和错误日志如下./flink run-application -t 
yarn-application \
-yD yarn.application.name="MyFlinkApp"  \
-yD 
yarn.provided.lib.dirs="hdfs://10.10.98.226:8020/user/myflink/flink-common-deps/libs/yarn-flink-1.13.0/lib/;hdfs://10.10.98.226:8020/user/myflink/flink-common-deps/libs/yarn-flink-1.13.0/plugins/"
  \
/app/qmatrix/yarn-flink-1.13.0/qmatrix_jars/Sink/Hive/0.0.2/flink-hive-sink-3.0.0-jar-with-dependencies.jar
  \
--config 
'{"centerName":"rui","checkPointInterval":1000,"checkPointPath":"hdfs://10.10.98.226:8020/tmp/checkpoint66/Sink_rui_ruihiveha_HaHive","configProperties":{"tableKeyName":"{\"rui.fruitest1\":[\"show_integer\"]}"},"jobParallelism":3,"kafkaBootstrapServers":"qmatrix-1:9092,qmatrix-2:9092,qmatrix-3:9092","kafkaConsumerProperties":{"enable.auto.commit":"false","partition.assignment.strategy":"org.apache.kafka.clients.consumer.RoundRobinAssignor","max.poll.records":"500","group.id":"Sink_rui_ruihiveha_HaHive_g","auto.offset.reset":"earliest","session.timeout.ms":"15000","bootstrap.servers":"qmatrix-1:9092,qmatrix-2:9092,qmatrix-3:9092","max.partition.fetch.bytes":"1048576","max.poll.interval.ms":"180","heartbeat.interval.ms":"3000","isolation.level":"read_committed","auto.commit.interval.ms":"1000"},"managerJdbcProperties":{"url":"jdbc:mysql://qmatrix-mysql:3306/qmatrix","phyTimeoutMillis":"3","maxActive":"1","driverClassName":"com.mysql.jdbc.Driver","removeAbandoned":"true","minEvictableIdleTimeMillis":"3","username":"root","minIdle":"0","removeAbandonedTimeout":"3","timeBetweenEvictionRunsMillis":"3","password":"Cljslrl0620$","keepAlive":"false","initialSize":"0"},"passwordKey":"GOODLUCKEVERYONE","pipeName":"ruihiveha","targetConfig":{"rui.ruihiveha.rac80_forever.rac80_ruitest1.1627609625759.router.qmatrix":{"migratePartitionTime":1,"partitionField":"pt_dt","partitionInterval":1,"partitionTime":0,"partitionTimeField":"SHOW_TIMESTAMP","periodMigratePartitionTime":0,"targetSchema":"rui","targetTable":"fruitest1"}},"targetNodeName":"HaHive","targetNodeProperties":{"defaultSchema":"default","hiveConfDir":"/app/qmatrix/kerberos/HaHive","hiveUser":"root","password":"123456","url":"jdbc:hive2://10.10.98.42:1","username":"root"},"targetNodeType":"Hive","topologyId":1137,"topologyName":"Sink_rui_ruihiveha_HaHive","yarnJobManagerMemory":"1024MB","yarnTaskManagerMemory":"2048MB","zookeeperUrl":"qmatrix-1:2181,qmatrix-2:2181,qmatrix-3:2181"}'
 \
--nodeType Hive \
--jobType Sink \
2021-07-30 11:20:01,402 INFO  flink-akka.actor.default-dispatcher-3 
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled 
external resources: [] 2021-07-30 11:20:01,416 INFO  
flink-akka.actor.default-dispatcher-3 
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper bound 
of the thread pool size is 500 2021-07-30 11:20:01,418 INFO  
flink-akka.actor.default-dispatcher-3 
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy [] - 
yarn.client.max-cached-nodemanagers-proxies : 0 2021-07-30 11:20:01,421 INFO  
flink-akka.actor.default-dispatcher-3 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
ResourceManager akka.tcp://flink@qmatrix-web:37982/user/rpc/resourcemanager_0 
was granted leadership with fencing token  
2021-07-30 11:20:01,818 INFO  flink-akka.actor.default-dispatcher-4 
com.alibaba.druid.pool.DruidDataSource   [] - 
{dataSource-1} inited 2021-07-30 11:20:02,648 WARN  
flink-akka.actor.default-dispatcher-4 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Application failed unexpectedly:  
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application. at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_161]at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_161]  at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
~[?:1.8.0_161] at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 ~[?:1.8.0_161] at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_161]   at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_161] at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0] at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0] at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_161]at 

Application Mode 模式部署失败

2021-07-29 文章
您好:
我的


 

savepopint 调试

2021-06-24 文章
您好:
  
我的savepoint数据有些问题,想在本地调试,请问在IDEA本地启动Flink程序的时候如何设置从指定savepoint的地址恢复启动

Re:Re:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-07 文章
我这也是,只有这些日志
prelaunch.out0.07
prelaunch.err0
taskmanager.out0
taskmanager.err


--Original--
From: "smq"<374060...@qq.com;
Date: Mon, Jun 7, 2021 03:49 PM
To: "周瑞"http://apache-flink.147419.n8.nabble.com/
amp;gt;amp;gt; amp;gt;
amp;gt;amp;gt; amp;gt;
amp;gt;amp;gt; amp;gt;
amp;gt;amp;gt; amp;gt; --
amp;gt;amp;gt; amp;gt; Best,
amp;gt;amp;gt; amp;gt; amp;amp;nbsp; pp
amp;gt;amp;gt;
amp;gt;
amp;gt;
amp;gt; --
amp;gt; Best,
amp;gt;amp;nbsp;amp;nbsp; pp
amp;gt;


-- 
Best,
amp;nbsp; pp

Re:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-07 文章
您好请问这个问题解决了么,我也遇到了同样的问题,在Standalone模式下日志是可以正常输出的,部署到yarn之后只有error日志了


--Original--
From: "smq"<374060...@qq.com;
Date: Fri, Jun 4, 2021 07:06 PM
To: "r pp"http://apache-flink.147419.n8.nabble.com/
gt;gt; gt;
gt;gt; gt;
gt;gt; gt;
gt;gt; gt; --
gt;gt; gt; Best,
gt;gt; gt; amp;nbsp; pp
gt;gt;
gt;
gt;
gt; --
gt; Best,
gt;nbsp;nbsp; pp
gt;


-- 
Best,
nbsp; pp

kafka exactly-once语义下,从svaepoint恢复报错

2021-06-01 文章
您好:kafka在exactly-once语义下,从svaepoint恢复报错。初步排查认为是kafka事务使用了旧的epoch。请问这个问题怎么处理?
//todo 通过配置传进来
env.setParallelism(1);
env.enableCheckpointing(60L, CheckpointingMode.EXACTLY_ONCE);

// checkpoint的清除策略(即使任务被显示地取消也会保留checkpoint)
env.getCheckpointConfig()

.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);

//TODO 生产中必须使用 HDFS
env.setStateBackend(new 
FsStateBackend("hdfs://10.10.98.226:8020/tmp/checkpoint66"));

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
public static final  String TABLE_NAME = "KafkaTable";
public static final  String COLUMN_NAME = "source_value";

public static final String KAFKA_TABLE_FORMAT =
"CREATE TABLE "+TABLE_NAME+" (\n" +
"  "+COLUMN_NAME+" STRING\n" +
") WITH (\n" +
"   'connector' = 'kafka',\n" +
"   'topic' = '%s',\n" +
"   'properties.bootstrap.servers' = '%s',\n" +
"   'sink.semantic' = 'exactly-once',\n" +
"   'properties.transaction.timeout.ms' = '3',\n" +
"   'format' = 'dbz-json'\n" +
")\n";
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
operation with an old epoch. Either there is a newer producer with the same 
transactionalId,
 or the producer's transaction has been expired by the broker. while recovering 
transaction KafkaTransactionState [transactionalId=Source: 
TableSourceScan(table=[[default_catalog, default_database, debezium_source]], 
fields=[data]) - Sink: 
Sink(table=[default_catalog.default_database.KafkaTable], 
fields=[data])-7df19f87deec5680128845fd9a6ca18d-0, producerId=239009, 
epoch=216]. Presumably this transaction has been already committed before